1 from __future__ import absolute_import
2 from builtins import str
3 from past.builtins import basestring
4 from builtins import object
14 from collections import deque
17 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
18 from .keep import KeepLocator, KeepClient
19 from .stream import StreamReader
20 from ._normalize_stream import normalize_stream
21 from ._ranges import Range, LocatorAndRange
22 from .safeapi import ThreadSafeApiCache
23 import arvados.config as config
24 import arvados.errors as errors
26 import arvados.events as events
27 from arvados.retry import retry_method
29 _logger = logging.getLogger('arvados.collection')
31 class CollectionBase(object):
35 def __exit__(self, exc_type, exc_value, traceback):
39 if self._keep_client is None:
40 self._keep_client = KeepClient(api_client=self._api_client,
41 num_retries=self.num_retries)
42 return self._keep_client
44 def stripped_manifest(self):
45 """Get the manifest with locator hints stripped.
47 Return the manifest for the current collection with all
48 non-portable hints (i.e., permission signatures and other
49 hints other than size hints) removed from the locators.
51 raw = self.manifest_text()
53 for line in raw.split("\n"):
56 clean_fields = fields[:1] + [
57 (re.sub(r'\+[^\d][^\+]*', '', x)
58 if re.match(arvados.util.keep_locator_pattern, x)
61 clean += [' '.join(clean_fields), "\n"]
65 class _WriterFile(_FileLikeObjectBase):
66 def __init__(self, coll_writer, name):
67 super(_WriterFile, self).__init__(name, 'wb')
68 self.dest = coll_writer
71 super(_WriterFile, self).close()
72 self.dest.finish_current_file()
74 @_FileLikeObjectBase._before_close
75 def write(self, data):
78 @_FileLikeObjectBase._before_close
79 def writelines(self, seq):
83 @_FileLikeObjectBase._before_close
85 self.dest.flush_data()
88 class CollectionWriter(CollectionBase):
89 def __init__(self, api_client=None, num_retries=0, replication=None):
90 """Instantiate a CollectionWriter.
92 CollectionWriter lets you build a new Arvados Collection from scratch.
93 Write files to it. The CollectionWriter will upload data to Keep as
94 appropriate, and provide you with the Collection manifest text when
98 * api_client: The API client to use to look up Collections. If not
99 provided, CollectionReader will build one from available Arvados
101 * num_retries: The default number of times to retry failed
102 service requests. Default 0. You may change this value
103 after instantiation, but note those changes may not
104 propagate to related objects like the Keep client.
105 * replication: The number of copies of each block to store.
106 If this argument is None or not supplied, replication is
107 the server-provided default if available, otherwise 2.
109 self._api_client = api_client
110 self.num_retries = num_retries
111 self.replication = (2 if replication is None else replication)
112 self._keep_client = None
113 self._data_buffer = []
114 self._data_buffer_len = 0
115 self._current_stream_files = []
116 self._current_stream_length = 0
117 self._current_stream_locators = []
118 self._current_stream_name = '.'
119 self._current_file_name = None
120 self._current_file_pos = 0
121 self._finished_streams = []
122 self._close_file = None
123 self._queued_file = None
124 self._queued_dirents = deque()
125 self._queued_trees = deque()
126 self._last_open = None
128 def __exit__(self, exc_type, exc_value, traceback):
132 def do_queued_work(self):
133 # The work queue consists of three pieces:
134 # * _queued_file: The file object we're currently writing to the
136 # * _queued_dirents: Entries under the current directory
137 # (_queued_trees[0]) that we want to write or recurse through.
138 # This may contain files from subdirectories if
139 # max_manifest_depth == 0 for this directory.
140 # * _queued_trees: Directories that should be written as separate
141 # streams to the Collection.
142 # This function handles the smallest piece of work currently queued
143 # (current file, then current directory, then next directory) until
144 # no work remains. The _work_THING methods each do a unit of work on
145 # THING. _queue_THING methods add a THING to the work queue.
147 if self._queued_file:
149 elif self._queued_dirents:
151 elif self._queued_trees:
156 def _work_file(self):
158 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
162 self.finish_current_file()
164 self._queued_file.close()
165 self._close_file = None
166 self._queued_file = None
168 def _work_dirents(self):
169 path, stream_name, max_manifest_depth = self._queued_trees[0]
170 if stream_name != self.current_stream_name():
171 self.start_new_stream(stream_name)
172 while self._queued_dirents:
173 dirent = self._queued_dirents.popleft()
174 target = os.path.join(path, dirent)
175 if os.path.isdir(target):
176 self._queue_tree(target,
177 os.path.join(stream_name, dirent),
178 max_manifest_depth - 1)
180 self._queue_file(target, dirent)
182 if not self._queued_dirents:
183 self._queued_trees.popleft()
185 def _work_trees(self):
186 path, stream_name, max_manifest_depth = self._queued_trees[0]
187 d = arvados.util.listdir_recursive(
188 path, max_depth = (None if max_manifest_depth == 0 else 0))
190 self._queue_dirents(stream_name, d)
192 self._queued_trees.popleft()
194 def _queue_file(self, source, filename=None):
195 assert (self._queued_file is None), "tried to queue more than one file"
196 if not hasattr(source, 'read'):
197 source = open(source, 'rb')
198 self._close_file = True
200 self._close_file = False
202 filename = os.path.basename(source.name)
203 self.start_new_file(filename)
204 self._queued_file = source
206 def _queue_dirents(self, stream_name, dirents):
207 assert (not self._queued_dirents), "tried to queue more than one tree"
208 self._queued_dirents = deque(sorted(dirents))
210 def _queue_tree(self, path, stream_name, max_manifest_depth):
211 self._queued_trees.append((path, stream_name, max_manifest_depth))
213 def write_file(self, source, filename=None):
214 self._queue_file(source, filename)
215 self.do_queued_work()
217 def write_directory_tree(self,
218 path, stream_name='.', max_manifest_depth=-1):
219 self._queue_tree(path, stream_name, max_manifest_depth)
220 self.do_queued_work()
222 def write(self, newdata):
223 if isinstance(newdata, bytes):
225 elif isinstance(newdata, str):
226 newdata = newdata.encode()
227 elif hasattr(newdata, '__iter__'):
231 self._data_buffer.append(newdata)
232 self._data_buffer_len += len(newdata)
233 self._current_stream_length += len(newdata)
234 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
237 def open(self, streampath, filename=None):
238 """open(streampath[, filename]) -> file-like object
240 Pass in the path of a file to write to the Collection, either as a
241 single string or as two separate stream name and file name arguments.
242 This method returns a file-like object you can write to add it to the
245 You may only have one file object from the Collection open at a time,
246 so be sure to close the object when you're done. Using the object in
247 a with statement makes that easy::
249 with cwriter.open('./doc/page1.txt') as outfile:
250 outfile.write(page1_data)
251 with cwriter.open('./doc/page2.txt') as outfile:
252 outfile.write(page2_data)
255 streampath, filename = split(streampath)
256 if self._last_open and not self._last_open.closed:
257 raise errors.AssertionError(
258 "can't open '{}' when '{}' is still open".format(
259 filename, self._last_open.name))
260 if streampath != self.current_stream_name():
261 self.start_new_stream(streampath)
262 self.set_current_file_name(filename)
263 self._last_open = _WriterFile(self, filename)
264 return self._last_open
266 def flush_data(self):
267 data_buffer = b''.join(self._data_buffer)
269 self._current_stream_locators.append(
271 data_buffer[0:config.KEEP_BLOCK_SIZE],
272 copies=self.replication))
273 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
274 self._data_buffer_len = len(self._data_buffer[0])
276 def start_new_file(self, newfilename=None):
277 self.finish_current_file()
278 self.set_current_file_name(newfilename)
280 def set_current_file_name(self, newfilename):
281 if re.search(r'[\t\n]', newfilename):
282 raise errors.AssertionError(
283 "Manifest filenames cannot contain whitespace: %s" %
285 elif re.search(r'\x00', newfilename):
286 raise errors.AssertionError(
287 "Manifest filenames cannot contain NUL characters: %s" %
289 self._current_file_name = newfilename
291 def current_file_name(self):
292 return self._current_file_name
294 def finish_current_file(self):
295 if self._current_file_name is None:
296 if self._current_file_pos == self._current_stream_length:
298 raise errors.AssertionError(
299 "Cannot finish an unnamed file " +
300 "(%d bytes at offset %d in '%s' stream)" %
301 (self._current_stream_length - self._current_file_pos,
302 self._current_file_pos,
303 self._current_stream_name))
304 self._current_stream_files.append([
305 self._current_file_pos,
306 self._current_stream_length - self._current_file_pos,
307 self._current_file_name])
308 self._current_file_pos = self._current_stream_length
309 self._current_file_name = None
311 def start_new_stream(self, newstreamname='.'):
312 self.finish_current_stream()
313 self.set_current_stream_name(newstreamname)
315 def set_current_stream_name(self, newstreamname):
316 if re.search(r'[\t\n]', newstreamname):
317 raise errors.AssertionError(
318 "Manifest stream names cannot contain whitespace: '%s'" %
320 self._current_stream_name = '.' if newstreamname=='' else newstreamname
322 def current_stream_name(self):
323 return self._current_stream_name
325 def finish_current_stream(self):
326 self.finish_current_file()
328 if not self._current_stream_files:
330 elif self._current_stream_name is None:
331 raise errors.AssertionError(
332 "Cannot finish an unnamed stream (%d bytes in %d files)" %
333 (self._current_stream_length, len(self._current_stream_files)))
335 if not self._current_stream_locators:
336 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
337 self._finished_streams.append([self._current_stream_name,
338 self._current_stream_locators,
339 self._current_stream_files])
340 self._current_stream_files = []
341 self._current_stream_length = 0
342 self._current_stream_locators = []
343 self._current_stream_name = None
344 self._current_file_pos = 0
345 self._current_file_name = None
348 """Store the manifest in Keep and return its locator.
350 This is useful for storing manifest fragments (task outputs)
351 temporarily in Keep during a Crunch job.
353 In other cases you should make a collection instead, by
354 sending manifest_text() to the API server's "create
355 collection" endpoint.
357 return self._my_keep().put(self.manifest_text().encode(),
358 copies=self.replication)
360 def portable_data_hash(self):
361 stripped = self.stripped_manifest().encode()
362 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
364 def manifest_text(self):
365 self.finish_current_stream()
368 for stream in self._finished_streams:
369 if not re.search(r'^\.(/.*)?$', stream[0]):
371 manifest += stream[0].replace(' ', '\\040')
372 manifest += ' ' + ' '.join(stream[1])
373 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
378 def data_locators(self):
380 for name, locators, files in self._finished_streams:
385 class ResumableCollectionWriter(CollectionWriter):
386 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
387 '_current_stream_locators', '_current_stream_name',
388 '_current_file_name', '_current_file_pos', '_close_file',
389 '_data_buffer', '_dependencies', '_finished_streams',
390 '_queued_dirents', '_queued_trees']
392 def __init__(self, api_client=None, **kwargs):
393 self._dependencies = {}
394 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
397 def from_state(cls, state, *init_args, **init_kwargs):
398 # Try to build a new writer from scratch with the given state.
399 # If the state is not suitable to resume (because files have changed,
400 # been deleted, aren't predictable, etc.), raise a
401 # StaleWriterStateError. Otherwise, return the initialized writer.
402 # The caller is responsible for calling writer.do_queued_work()
403 # appropriately after it's returned.
404 writer = cls(*init_args, **init_kwargs)
405 for attr_name in cls.STATE_PROPS:
406 attr_value = state[attr_name]
407 attr_class = getattr(writer, attr_name).__class__
408 # Coerce the value into the same type as the initial value, if
410 if attr_class not in (type(None), attr_value.__class__):
411 attr_value = attr_class(attr_value)
412 setattr(writer, attr_name, attr_value)
413 # Check dependencies before we try to resume anything.
414 if any(KeepLocator(ls).permission_expired()
415 for ls in writer._current_stream_locators):
416 raise errors.StaleWriterStateError(
417 "locators include expired permission hint")
418 writer.check_dependencies()
419 if state['_current_file'] is not None:
420 path, pos = state['_current_file']
422 writer._queued_file = open(path, 'rb')
423 writer._queued_file.seek(pos)
424 except IOError as error:
425 raise errors.StaleWriterStateError(
426 "failed to reopen active file {}: {}".format(path, error))
429 def check_dependencies(self):
430 for path, orig_stat in list(self._dependencies.items()):
431 if not S_ISREG(orig_stat[ST_MODE]):
432 raise errors.StaleWriterStateError("{} not file".format(path))
434 now_stat = tuple(os.stat(path))
435 except OSError as error:
436 raise errors.StaleWriterStateError(
437 "failed to stat {}: {}".format(path, error))
438 if ((not S_ISREG(now_stat[ST_MODE])) or
439 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
440 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
441 raise errors.StaleWriterStateError("{} changed".format(path))
443 def dump_state(self, copy_func=lambda x: x):
444 state = {attr: copy_func(getattr(self, attr))
445 for attr in self.STATE_PROPS}
446 if self._queued_file is None:
447 state['_current_file'] = None
449 state['_current_file'] = (os.path.realpath(self._queued_file.name),
450 self._queued_file.tell())
453 def _queue_file(self, source, filename=None):
455 src_path = os.path.realpath(source)
457 raise errors.AssertionError("{} not a file path".format(source))
459 path_stat = os.stat(src_path)
460 except OSError as stat_error:
462 super(ResumableCollectionWriter, self)._queue_file(source, filename)
463 fd_stat = os.fstat(self._queued_file.fileno())
464 if not S_ISREG(fd_stat.st_mode):
465 # We won't be able to resume from this cache anyway, so don't
466 # worry about further checks.
467 self._dependencies[source] = tuple(fd_stat)
468 elif path_stat is None:
469 raise errors.AssertionError(
470 "could not stat {}: {}".format(source, stat_error))
471 elif path_stat.st_ino != fd_stat.st_ino:
472 raise errors.AssertionError(
473 "{} changed between open and stat calls".format(source))
475 self._dependencies[src_path] = tuple(fd_stat)
477 def write(self, data):
478 if self._queued_file is None:
479 raise errors.AssertionError(
480 "resumable writer can't accept unsourced data")
481 return super(ResumableCollectionWriter, self).write(data)
489 COLLECTION = "collection"
491 class RichCollectionBase(CollectionBase):
492 """Base class for Collections and Subcollections.
494 Implements the majority of functionality relating to accessing items in the
499 def __init__(self, parent=None):
501 self._committed = False
502 self._callback = None
506 raise NotImplementedError()
509 raise NotImplementedError()
511 def _my_block_manager(self):
512 raise NotImplementedError()
515 raise NotImplementedError()
517 def root_collection(self):
518 raise NotImplementedError()
520 def notify(self, event, collection, name, item):
521 raise NotImplementedError()
523 def stream_name(self):
524 raise NotImplementedError()
528 def find_or_create(self, path, create_type):
529 """Recursively search the specified file path.
531 May return either a `Collection` or `ArvadosFile`. If not found, will
532 create a new item at the specified path based on `create_type`. Will
533 create intermediate subcollections needed to contain the final item in
537 One of `arvados.collection.FILE` or
538 `arvados.collection.COLLECTION`. If the path is not found, and value
539 of create_type is FILE then create and return a new ArvadosFile for
540 the last path component. If COLLECTION, then create and return a new
541 Collection for the last path component.
545 pathcomponents = path.split("/", 1)
546 if pathcomponents[0]:
547 item = self._items.get(pathcomponents[0])
548 if len(pathcomponents) == 1:
551 if create_type == COLLECTION:
552 item = Subcollection(self, pathcomponents[0])
554 item = ArvadosFile(self, pathcomponents[0])
555 self._items[pathcomponents[0]] = item
556 self.set_committed(False)
557 self.notify(ADD, self, pathcomponents[0], item)
561 # create new collection
562 item = Subcollection(self, pathcomponents[0])
563 self._items[pathcomponents[0]] = item
564 self.set_committed(False)
565 self.notify(ADD, self, pathcomponents[0], item)
566 if isinstance(item, RichCollectionBase):
567 return item.find_or_create(pathcomponents[1], create_type)
569 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
574 def find(self, path):
575 """Recursively search the specified file path.
577 May return either a Collection or ArvadosFile. Return None if not
579 If path is invalid (ex: starts with '/'), an IOError exception will be
584 raise errors.ArgumentError("Parameter 'path' is empty.")
586 pathcomponents = path.split("/", 1)
587 if pathcomponents[0] == '':
588 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
590 item = self._items.get(pathcomponents[0])
593 elif len(pathcomponents) == 1:
596 if isinstance(item, RichCollectionBase):
597 if pathcomponents[1]:
598 return item.find(pathcomponents[1])
602 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
605 def mkdirs(self, path):
606 """Recursive subcollection create.
608 Like `os.makedirs()`. Will create intermediate subcollections needed
609 to contain the leaf subcollection path.
613 if self.find(path) != None:
614 raise IOError(errno.EEXIST, "Directory or file exists", path)
616 return self.find_or_create(path, COLLECTION)
618 def open(self, path, mode="r"):
619 """Open a file-like object for access.
622 path to a file in the collection
624 a string consisting of "r", "w", or "a", optionally followed
625 by "b" or "t", optionally followed by "+".
627 binary mode: write() accepts bytes, read() returns bytes.
629 text mode (default): write() accepts strings, read() returns strings.
633 opens for reading and writing. Reads/writes share a file pointer.
635 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
637 opens for reading and writing. All writes are appended to
638 the end of the file. Writing does not affect the file pointer for
642 if not re.search(r'^[rwa][bt]?\+?$', mode):
643 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
645 if mode[0] == 'r' and '+' not in mode:
646 fclass = ArvadosFileReader
647 arvfile = self.find(path)
648 elif not self.writable():
649 raise IOError(errno.EROFS, "Collection is read only")
651 fclass = ArvadosFileWriter
652 arvfile = self.find_or_create(path, FILE)
655 raise IOError(errno.ENOENT, "File not found", path)
656 if not isinstance(arvfile, ArvadosFile):
657 raise IOError(errno.EISDIR, "Is a directory", path)
662 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
665 """Determine if the collection has been modified since last commited."""
666 return not self.committed()
670 """Determine if the collection has been committed to the API server."""
671 return self._committed
674 def set_committed(self, value=True):
675 """Recursively set committed flag.
677 If value is True, set committed to be True for this and all children.
679 If value is False, set committed to be False for this and all parents.
681 if value == self._committed:
684 for k,v in list(self._items.items()):
685 v.set_committed(True)
686 self._committed = True
688 self._committed = False
689 if self.parent is not None:
690 self.parent.set_committed(False)
694 """Iterate over names of files and collections contained in this collection."""
695 return iter(list(self._items.keys()))
698 def __getitem__(self, k):
699 """Get a file or collection that is directly contained by this collection.
701 If you want to search a path, use `find()` instead.
704 return self._items[k]
707 def __contains__(self, k):
708 """Test if there is a file or collection a directly contained by this collection."""
709 return k in self._items
713 """Get the number of items directly contained in this collection."""
714 return len(self._items)
718 def __delitem__(self, p):
719 """Delete an item by name which is directly contained by this collection."""
721 self.set_committed(False)
722 self.notify(DEL, self, p, None)
726 """Get a list of names of files and collections directly contained in this collection."""
727 return list(self._items.keys())
731 """Get a list of files and collection objects directly contained in this collection."""
732 return list(self._items.values())
736 """Get a list of (name, object) tuples directly contained in this collection."""
737 return list(self._items.items())
739 def exists(self, path):
740 """Test if there is a file or collection at `path`."""
741 return self.find(path) is not None
745 def remove(self, path, recursive=False):
746 """Remove the file or subcollection (directory) at `path`.
749 Specify whether to remove non-empty subcollections (True), or raise an error (False).
753 raise errors.ArgumentError("Parameter 'path' is empty.")
755 pathcomponents = path.split("/", 1)
756 item = self._items.get(pathcomponents[0])
758 raise IOError(errno.ENOENT, "File not found", path)
759 if len(pathcomponents) == 1:
760 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
761 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
762 deleteditem = self._items[pathcomponents[0]]
763 del self._items[pathcomponents[0]]
764 self.set_committed(False)
765 self.notify(DEL, self, pathcomponents[0], deleteditem)
767 item.remove(pathcomponents[1])
769 def _clonefrom(self, source):
770 for k,v in list(source.items()):
771 self._items[k] = v.clone(self, k)
774 raise NotImplementedError()
778 def add(self, source_obj, target_name, overwrite=False, reparent=False):
779 """Copy or move a file or subcollection to this collection.
782 An ArvadosFile, or Subcollection object
785 Destination item name. If the target name already exists and is a
786 file, this will raise an error unless you specify `overwrite=True`.
789 Whether to overwrite target file if it already exists.
792 If True, source_obj will be moved from its parent collection to this collection.
793 If False, source_obj will be copied and the parent collection will be
798 if target_name in self and not overwrite:
799 raise IOError(errno.EEXIST, "File already exists", target_name)
802 if target_name in self:
803 modified_from = self[target_name]
805 # Actually make the move or copy.
807 source_obj._reparent(self, target_name)
810 item = source_obj.clone(self, target_name)
812 self._items[target_name] = item
813 self.set_committed(False)
816 self.notify(MOD, self, target_name, (modified_from, item))
818 self.notify(ADD, self, target_name, item)
820 def _get_src_target(self, source, target_path, source_collection, create_dest):
821 if source_collection is None:
822 source_collection = self
825 if isinstance(source, basestring):
826 source_obj = source_collection.find(source)
827 if source_obj is None:
828 raise IOError(errno.ENOENT, "File not found", source)
829 sourcecomponents = source.split("/")
832 sourcecomponents = None
834 # Find parent collection the target path
835 targetcomponents = target_path.split("/")
837 # Determine the name to use.
838 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
841 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
844 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
846 if len(targetcomponents) > 1:
847 target_dir = self.find("/".join(targetcomponents[0:-1]))
851 if target_dir is None:
852 raise IOError(errno.ENOENT, "Target directory not found", target_name)
854 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
855 target_dir = target_dir[target_name]
856 target_name = sourcecomponents[-1]
858 return (source_obj, target_dir, target_name)
862 def copy(self, source, target_path, source_collection=None, overwrite=False):
863 """Copy a file or subcollection to a new path in this collection.
866 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
869 Destination file or path. If the target path already exists and is a
870 subcollection, the item will be placed inside the subcollection. If
871 the target path already exists and is a file, this will raise an error
872 unless you specify `overwrite=True`.
875 Collection to copy `source_path` from (default `self`)
878 Whether to overwrite target file if it already exists.
881 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
882 target_dir.add(source_obj, target_name, overwrite, False)
886 def rename(self, source, target_path, source_collection=None, overwrite=False):
887 """Move a file or subcollection from `source_collection` to a new path in this collection.
890 A string with a path to source file or subcollection.
893 Destination file or path. If the target path already exists and is a
894 subcollection, the item will be placed inside the subcollection. If
895 the target path already exists and is a file, this will raise an error
896 unless you specify `overwrite=True`.
899 Collection to copy `source_path` from (default `self`)
902 Whether to overwrite target file if it already exists.
905 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
906 if not source_obj.writable():
907 raise IOError(errno.EROFS, "Source collection is read only", source)
908 target_dir.add(source_obj, target_name, overwrite, True)
910 def portable_manifest_text(self, stream_name="."):
911 """Get the manifest text for this collection, sub collections and files.
913 This method does not flush outstanding blocks to Keep. It will return
914 a normalized manifest with access tokens stripped.
917 Name to use for this stream (directory)
920 return self._get_manifest_text(stream_name, True, True)
923 def manifest_text(self, stream_name=".", strip=False, normalize=False,
924 only_committed=False):
925 """Get the manifest text for this collection, sub collections and files.
927 This method will flush outstanding blocks to Keep. By default, it will
928 not normalize an unmodified manifest or strip access tokens.
931 Name to use for this stream (directory)
934 If True, remove signing tokens from block locators if present.
935 If False (default), block locators are left unchanged.
938 If True, always export the manifest text in normalized form
939 even if the Collection is not modified. If False (default) and the collection
940 is not modified, return the original manifest text even if it is not
944 If True, don't commit pending blocks.
948 if not only_committed:
949 self._my_block_manager().commit_all()
950 return self._get_manifest_text(stream_name, strip, normalize,
951 only_committed=only_committed)
954 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
955 """Get the manifest text for this collection, sub collections and files.
958 Name to use for this stream (directory)
961 If True, remove signing tokens from block locators if present.
962 If False (default), block locators are left unchanged.
965 If True, always export the manifest text in normalized form
966 even if the Collection is not modified. If False (default) and the collection
967 is not modified, return the original manifest text even if it is not
971 If True, only include blocks that were already committed to Keep.
975 if not self.committed() or self._manifest_text is None or normalize:
978 sorted_keys = sorted(self.keys())
979 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
980 # Create a stream per file `k`
981 arvfile = self[filename]
983 for segment in arvfile.segments():
984 loc = segment.locator
985 if arvfile.parent._my_block_manager().is_bufferblock(loc):
988 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
990 loc = KeepLocator(loc).stripped()
991 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
992 segment.segment_offset, segment.range_size))
993 stream[filename] = filestream
995 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
996 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
997 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1001 return self.stripped_manifest()
1003 return self._manifest_text
1006 def diff(self, end_collection, prefix=".", holding_collection=None):
1007 """Generate list of add/modify/delete actions.
1009 When given to `apply`, will change `self` to match `end_collection`
1013 if holding_collection is None:
1014 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1016 if k not in end_collection:
1017 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1018 for k in end_collection:
1020 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1021 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1022 elif end_collection[k] != self[k]:
1023 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1025 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1027 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1032 def apply(self, changes):
1033 """Apply changes from `diff`.
1035 If a change conflicts with a local change, it will be saved to an
1036 alternate path indicating the conflict.
1040 self.set_committed(False)
1041 for change in changes:
1042 event_type = change[0]
1045 local = self.find(path)
1046 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1048 if event_type == ADD:
1050 # No local file at path, safe to copy over new file
1051 self.copy(initial, path)
1052 elif local is not None and local != initial:
1053 # There is already local file and it is different:
1054 # save change to conflict file.
1055 self.copy(initial, conflictpath)
1056 elif event_type == MOD or event_type == TOK:
1058 if local == initial:
1059 # Local matches the "initial" item so it has not
1060 # changed locally and is safe to update.
1061 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1062 # Replace contents of local file with new contents
1063 local.replace_contents(final)
1065 # Overwrite path with new item; this can happen if
1066 # path was a file and is now a collection or vice versa
1067 self.copy(final, path, overwrite=True)
1069 # Local is missing (presumably deleted) or local doesn't
1070 # match the "start" value, so save change to conflict file
1071 self.copy(final, conflictpath)
1072 elif event_type == DEL:
1073 if local == initial:
1074 # Local item matches "initial" value, so it is safe to remove.
1075 self.remove(path, recursive=True)
1076 # else, the file is modified or already removed, in either
1077 # case we don't want to try to remove it.
1079 def portable_data_hash(self):
1080 """Get the portable data hash for this collection's manifest."""
1081 if self._manifest_locator and self.committed():
1082 # If the collection is already saved on the API server, and it's committed
1083 # then return API server's PDH response.
1084 return self._portable_data_hash
1086 stripped = self.portable_manifest_text().encode()
1087 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1090 def subscribe(self, callback):
1091 if self._callback is None:
1092 self._callback = callback
1094 raise errors.ArgumentError("A callback is already set on this collection.")
1097 def unsubscribe(self):
1098 if self._callback is not None:
1099 self._callback = None
1102 def notify(self, event, collection, name, item):
1104 self._callback(event, collection, name, item)
1105 self.root_collection().notify(event, collection, name, item)
1108 def __eq__(self, other):
1111 if not isinstance(other, RichCollectionBase):
1113 if len(self._items) != len(other):
1115 for k in self._items:
1118 if self._items[k] != other[k]:
1122 def __ne__(self, other):
1123 return not self.__eq__(other)
1127 """Flush bufferblocks to Keep."""
1128 for e in list(self.values()):
1132 class Collection(RichCollectionBase):
1133 """Represents the root of an Arvados Collection.
1135 This class is threadsafe. The root collection object, all subcollections
1136 and files are protected by a single lock (i.e. each access locks the entire
1142 :To read an existing file:
1143 `c.open("myfile", "r")`
1145 :To write a new file:
1146 `c.open("myfile", "w")`
1148 :To determine if a file exists:
1149 `c.find("myfile") is not None`
1152 `c.copy("source", "dest")`
1155 `c.remove("myfile")`
1157 :To save to an existing collection record:
1160 :To save a new collection record:
1163 :To merge remote changes into this object:
1166 Must be associated with an API server Collection record (during
1167 initialization, or using `save_new`) to use `save` or `update`
1171 def __init__(self, manifest_locator_or_text=None,
1178 replication_desired=None,
1180 """Collection constructor.
1182 :manifest_locator_or_text:
1183 One of Arvados collection UUID, block locator of
1184 a manifest, raw manifest text, or None (to create an empty collection).
1186 the parent Collection, may be None.
1189 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1190 Prefer this over supplying your own api_client and keep_client (except in testing).
1191 Will use default config settings if not specified.
1194 The API client object to use for requests. If not specified, create one using `apiconfig`.
1197 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1200 the number of retries for API and Keep requests.
1203 the block manager to use. If not specified, create one.
1205 :replication_desired:
1206 How many copies should Arvados maintain. If None, API server default
1207 configuration applies. If not None, this value will also be used
1208 for determining the number of block copies being written.
1211 super(Collection, self).__init__(parent)
1212 self._api_client = api_client
1213 self._keep_client = keep_client
1214 self._block_manager = block_manager
1215 self.replication_desired = replication_desired
1216 self.put_threads = put_threads
1219 self._config = apiconfig
1221 self._config = config.settings()
1223 self.num_retries = num_retries if num_retries is not None else 0
1224 self._manifest_locator = None
1225 self._manifest_text = None
1226 self._portable_data_hash = None
1227 self._api_response = None
1228 self._past_versions = set()
1230 self.lock = threading.RLock()
1233 if manifest_locator_or_text:
1234 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1235 self._manifest_locator = manifest_locator_or_text
1236 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1237 self._manifest_locator = manifest_locator_or_text
1238 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1239 self._manifest_text = manifest_locator_or_text
1241 raise errors.ArgumentError(
1242 "Argument to CollectionReader is not a manifest or a collection UUID")
1246 except (IOError, errors.SyntaxError) as e:
1247 raise errors.ArgumentError("Error processing manifest text: %s", e)
1249 def root_collection(self):
1252 def stream_name(self):
1259 def known_past_version(self, modified_at_and_portable_data_hash):
1260 return modified_at_and_portable_data_hash in self._past_versions
1264 def update(self, other=None, num_retries=None):
1265 """Merge the latest collection on the API server with the current collection."""
1268 if self._manifest_locator is None:
1269 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1270 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1271 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1272 response.get("portable_data_hash") != self.portable_data_hash()):
1273 # The record on the server is different from our current one, but we've seen it before,
1274 # so ignore it because it's already been merged.
1275 # However, if it's the same as our current record, proceed with the update, because we want to update
1279 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1280 other = CollectionReader(response["manifest_text"])
1281 baseline = CollectionReader(self._manifest_text)
1282 self.apply(baseline.diff(other))
1283 self._manifest_text = self.manifest_text()
1287 if self._api_client is None:
1288 self._api_client = ThreadSafeApiCache(self._config)
1289 if self._keep_client is None:
1290 self._keep_client = self._api_client.keep
1291 return self._api_client
1295 if self._keep_client is None:
1296 if self._api_client is None:
1299 self._keep_client = KeepClient(api_client=self._api_client)
1300 return self._keep_client
1303 def _my_block_manager(self):
1304 if self._block_manager is None:
1305 copies = (self.replication_desired or
1306 self._my_api()._rootDesc.get('defaultCollectionReplication',
1308 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1309 return self._block_manager
1311 def _remember_api_response(self, response):
1312 self._api_response = response
1313 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1315 def _populate_from_api_server(self):
1316 # As in KeepClient itself, we must wait until the last
1317 # possible moment to instantiate an API client, in order to
1318 # avoid tripping up clients that don't have access to an API
1319 # server. If we do build one, make sure our Keep client uses
1320 # it. If instantiation fails, we'll fall back to the except
1321 # clause, just like any other Collection lookup
1322 # failure. Return an exception, or None if successful.
1324 self._remember_api_response(self._my_api().collections().get(
1325 uuid=self._manifest_locator).execute(
1326 num_retries=self.num_retries))
1327 self._manifest_text = self._api_response['manifest_text']
1328 self._portable_data_hash = self._api_response['portable_data_hash']
1329 # If not overriden via kwargs, we should try to load the
1330 # replication_desired from the API server
1331 if self.replication_desired is None:
1332 self.replication_desired = self._api_response.get('replication_desired', None)
1334 except Exception as e:
1337 def _populate_from_keep(self):
1338 # Retrieve a manifest directly from Keep. This has a chance of
1339 # working if [a] the locator includes a permission signature
1340 # or [b] the Keep services are operating in world-readable
1341 # mode. Return an exception, or None if successful.
1343 self._manifest_text = self._my_keep().get(
1344 self._manifest_locator, num_retries=self.num_retries).decode()
1345 except Exception as e:
1348 def _populate(self):
1349 if self._manifest_locator is None and self._manifest_text is None:
1351 error_via_api = None
1352 error_via_keep = None
1353 should_try_keep = ((self._manifest_text is None) and
1354 arvados.util.keep_locator_pattern.match(
1355 self._manifest_locator))
1356 if ((self._manifest_text is None) and
1357 arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1358 error_via_keep = self._populate_from_keep()
1359 if self._manifest_text is None:
1360 error_via_api = self._populate_from_api_server()
1361 if error_via_api is not None and not should_try_keep:
1363 if ((self._manifest_text is None) and
1364 not error_via_keep and
1366 # Looks like a keep locator, and we didn't already try keep above
1367 error_via_keep = self._populate_from_keep()
1368 if self._manifest_text is None:
1370 raise errors.NotFoundError(
1371 ("Failed to retrieve collection '{}' " +
1372 "from either API server ({}) or Keep ({})."
1374 self._manifest_locator,
1378 self._baseline_manifest = self._manifest_text
1379 self._import_manifest(self._manifest_text)
1382 def _has_collection_uuid(self):
1383 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1385 def __enter__(self):
1388 def __exit__(self, exc_type, exc_value, traceback):
1389 """Support scoped auto-commit in a with: block."""
1390 if exc_type is None:
1391 if self.writable() and self._has_collection_uuid():
1395 def stop_threads(self):
1396 if self._block_manager is not None:
1397 self._block_manager.stop_threads()
1400 def manifest_locator(self):
1401 """Get the manifest locator, if any.
1403 The manifest locator will be set when the collection is loaded from an
1404 API server record or the portable data hash of a manifest.
1406 The manifest locator will be None if the collection is newly created or
1407 was created directly from manifest text. The method `save_new()` will
1408 assign a manifest locator.
1411 return self._manifest_locator
1414 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1415 if new_config is None:
1416 new_config = self._config
1418 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1420 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1422 newcollection._clonefrom(self)
1423 return newcollection
1426 def api_response(self):
1427 """Returns information about this Collection fetched from the API server.
1429 If the Collection exists in Keep but not the API server, currently
1430 returns None. Future versions may provide a synthetic response.
1433 return self._api_response
1435 def find_or_create(self, path, create_type):
1436 """See `RichCollectionBase.find_or_create`"""
1440 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1442 def find(self, path):
1443 """See `RichCollectionBase.find`"""
1447 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1449 def remove(self, path, recursive=False):
1450 """See `RichCollectionBase.remove`"""
1452 raise errors.ArgumentError("Cannot remove '.'")
1454 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1459 def save(self, merge=True, num_retries=None):
1460 """Save collection to an existing collection record.
1462 Commit pending buffer blocks to Keep, merge with remote record (if
1463 merge=True, the default), and update the collection record. Returns
1464 the current manifest text.
1466 Will raise AssertionError if not associated with a collection record on
1467 the API server. If you want to save a manifest to Keep only, see
1471 Update and merge remote changes before saving. Otherwise, any
1472 remote changes will be ignored and overwritten.
1475 Retry count on API calls (if None, use the collection default)
1478 if not self.committed():
1479 if not self._has_collection_uuid():
1480 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1482 self._my_block_manager().commit_all()
1487 text = self.manifest_text(strip=False)
1488 self._remember_api_response(self._my_api().collections().update(
1489 uuid=self._manifest_locator,
1490 body={'manifest_text': text}
1492 num_retries=num_retries))
1493 self._manifest_text = self._api_response["manifest_text"]
1494 self._portable_data_hash = self._api_response["portable_data_hash"]
1495 self.set_committed(True)
1497 return self._manifest_text
1503 def save_new(self, name=None,
1504 create_collection_record=True,
1506 ensure_unique_name=False,
1508 """Save collection to a new collection record.
1510 Commit pending buffer blocks to Keep and, when create_collection_record
1511 is True (default), create a new collection record. After creating a
1512 new collection record, this Collection object will be associated with
1513 the new record used by `save()`. Returns the current manifest text.
1516 The collection name.
1518 :create_collection_record:
1519 If True, create a collection record on the API server.
1520 If False, only commit blocks to Keep and return the manifest text.
1523 the user, or project uuid that will own this collection.
1524 If None, defaults to the current user.
1526 :ensure_unique_name:
1527 If True, ask the API server to rename the collection
1528 if it conflicts with a collection with the same name and owner. If
1529 False, a name conflict will result in an error.
1532 Retry count on API calls (if None, use the collection default)
1535 self._my_block_manager().commit_all()
1536 text = self.manifest_text(strip=False)
1538 if create_collection_record:
1540 name = "New collection"
1541 ensure_unique_name = True
1543 body = {"manifest_text": text,
1545 "replication_desired": self.replication_desired}
1547 body["owner_uuid"] = owner_uuid
1549 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1550 text = self._api_response["manifest_text"]
1552 self._manifest_locator = self._api_response["uuid"]
1553 self._portable_data_hash = self._api_response["portable_data_hash"]
1555 self._manifest_text = text
1556 self.set_committed(True)
1561 def _import_manifest(self, manifest_text):
1562 """Import a manifest into a `Collection`.
1565 The manifest text to import from.
1569 raise ArgumentError("Can only import manifest into an empty collection")
1578 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1579 tok = token_and_separator.group(1)
1580 sep = token_and_separator.group(2)
1582 if state == STREAM_NAME:
1583 # starting a new stream
1584 stream_name = tok.replace('\\040', ' ')
1589 self.find_or_create(stream_name, COLLECTION)
1593 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1595 blocksize = int(block_locator.group(1))
1596 blocks.append(Range(tok, streamoffset, blocksize, 0))
1597 streamoffset += blocksize
1601 if state == SEGMENTS:
1602 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1604 pos = int(file_segment.group(1))
1605 size = int(file_segment.group(2))
1606 name = file_segment.group(3).replace('\\040', ' ')
1607 filepath = os.path.join(stream_name, name)
1608 afile = self.find_or_create(filepath, FILE)
1609 if isinstance(afile, ArvadosFile):
1610 afile.add_segment(blocks, pos, size)
1612 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1615 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1621 self.set_committed(True)
1624 def notify(self, event, collection, name, item):
1626 self._callback(event, collection, name, item)
1629 class Subcollection(RichCollectionBase):
1630 """This is a subdirectory within a collection that doesn't have its own API
1633 Subcollection locking falls under the umbrella lock of its root collection.
1637 def __init__(self, parent, name):
1638 super(Subcollection, self).__init__(parent)
1639 self.lock = self.root_collection().lock
1640 self._manifest_text = None
1642 self.num_retries = parent.num_retries
1644 def root_collection(self):
1645 return self.parent.root_collection()
1648 return self.root_collection().writable()
1651 return self.root_collection()._my_api()
1654 return self.root_collection()._my_keep()
1656 def _my_block_manager(self):
1657 return self.root_collection()._my_block_manager()
1659 def stream_name(self):
1660 return os.path.join(self.parent.stream_name(), self.name)
1663 def clone(self, new_parent, new_name):
1664 c = Subcollection(new_parent, new_name)
1670 def _reparent(self, newparent, newname):
1671 self.set_committed(False)
1673 self.parent.remove(self.name, recursive=True)
1674 self.parent = newparent
1676 self.lock = self.parent.root_collection().lock
1679 class CollectionReader(Collection):
1680 """A read-only collection object.
1682 Initialize from an api collection record locator, a portable data hash of a
1683 manifest, or raw manifest text. See `Collection` constructor for detailed
1687 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1688 self._in_init = True
1689 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1690 self._in_init = False
1692 # Forego any locking since it should never change once initialized.
1693 self.lock = NoopLock()
1695 # Backwards compatability with old CollectionReader
1696 # all_streams() and all_files()
1697 self._streams = None
1700 return self._in_init
1702 def _populate_streams(orig_func):
1703 @functools.wraps(orig_func)
1704 def populate_streams_wrapper(self, *args, **kwargs):
1705 # Defer populating self._streams until needed since it creates a copy of the manifest.
1706 if self._streams is None:
1707 if self._manifest_text:
1708 self._streams = [sline.split()
1709 for sline in self._manifest_text.split("\n")
1713 return orig_func(self, *args, **kwargs)
1714 return populate_streams_wrapper
1717 def normalize(self):
1718 """Normalize the streams returned by `all_streams`.
1720 This method is kept for backwards compatability and only affects the
1721 behavior of `all_streams()` and `all_files()`
1727 for s in self.all_streams():
1728 for f in s.all_files():
1729 streamname, filename = split(s.name() + "/" + f.name())
1730 if streamname not in streams:
1731 streams[streamname] = {}
1732 if filename not in streams[streamname]:
1733 streams[streamname][filename] = []
1734 for r in f.segments:
1735 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1737 self._streams = [normalize_stream(s, streams[s])
1738 for s in sorted(streams)]
1740 def all_streams(self):
1741 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1742 for s in self._streams]
1745 def all_files(self):
1746 for s in self.all_streams():
1747 for f in s.all_files():