10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace: '%s'" %
312 self._current_stream_name = '.' if newstreamname=='' else newstreamname
314 def current_stream_name(self):
315 return self._current_stream_name
317 def finish_current_stream(self):
318 self.finish_current_file()
320 if not self._current_stream_files:
322 elif self._current_stream_name is None:
323 raise errors.AssertionError(
324 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325 (self._current_stream_length, len(self._current_stream_files)))
327 if not self._current_stream_locators:
328 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329 self._finished_streams.append([self._current_stream_name,
330 self._current_stream_locators,
331 self._current_stream_files])
332 self._current_stream_files = []
333 self._current_stream_length = 0
334 self._current_stream_locators = []
335 self._current_stream_name = None
336 self._current_file_pos = 0
337 self._current_file_name = None
340 """Store the manifest in Keep and return its locator.
342 This is useful for storing manifest fragments (task outputs)
343 temporarily in Keep during a Crunch job.
345 In other cases you should make a collection instead, by
346 sending manifest_text() to the API server's "create
347 collection" endpoint.
349 return self._my_keep().put(self.manifest_text(), copies=self.replication)
351 def portable_data_hash(self):
352 stripped = self.stripped_manifest()
353 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
355 def manifest_text(self):
356 self.finish_current_stream()
359 for stream in self._finished_streams:
360 if not re.search(r'^\.(/.*)?$', stream[0]):
362 manifest += stream[0].replace(' ', '\\040')
363 manifest += ' ' + ' '.join(stream[1])
364 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
369 def data_locators(self):
371 for name, locators, files in self._finished_streams:
376 class ResumableCollectionWriter(CollectionWriter):
377 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378 '_current_stream_locators', '_current_stream_name',
379 '_current_file_name', '_current_file_pos', '_close_file',
380 '_data_buffer', '_dependencies', '_finished_streams',
381 '_queued_dirents', '_queued_trees']
383 def __init__(self, api_client=None, **kwargs):
384 self._dependencies = {}
385 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
388 def from_state(cls, state, *init_args, **init_kwargs):
389 # Try to build a new writer from scratch with the given state.
390 # If the state is not suitable to resume (because files have changed,
391 # been deleted, aren't predictable, etc.), raise a
392 # StaleWriterStateError. Otherwise, return the initialized writer.
393 # The caller is responsible for calling writer.do_queued_work()
394 # appropriately after it's returned.
395 writer = cls(*init_args, **init_kwargs)
396 for attr_name in cls.STATE_PROPS:
397 attr_value = state[attr_name]
398 attr_class = getattr(writer, attr_name).__class__
399 # Coerce the value into the same type as the initial value, if
401 if attr_class not in (type(None), attr_value.__class__):
402 attr_value = attr_class(attr_value)
403 setattr(writer, attr_name, attr_value)
404 # Check dependencies before we try to resume anything.
405 if any(KeepLocator(ls).permission_expired()
406 for ls in writer._current_stream_locators):
407 raise errors.StaleWriterStateError(
408 "locators include expired permission hint")
409 writer.check_dependencies()
410 if state['_current_file'] is not None:
411 path, pos = state['_current_file']
413 writer._queued_file = open(path, 'rb')
414 writer._queued_file.seek(pos)
415 except IOError as error:
416 raise errors.StaleWriterStateError(
417 "failed to reopen active file {}: {}".format(path, error))
420 def check_dependencies(self):
421 for path, orig_stat in self._dependencies.items():
422 if not S_ISREG(orig_stat[ST_MODE]):
423 raise errors.StaleWriterStateError("{} not file".format(path))
425 now_stat = tuple(os.stat(path))
426 except OSError as error:
427 raise errors.StaleWriterStateError(
428 "failed to stat {}: {}".format(path, error))
429 if ((not S_ISREG(now_stat[ST_MODE])) or
430 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432 raise errors.StaleWriterStateError("{} changed".format(path))
434 def dump_state(self, copy_func=lambda x: x):
435 state = {attr: copy_func(getattr(self, attr))
436 for attr in self.STATE_PROPS}
437 if self._queued_file is None:
438 state['_current_file'] = None
440 state['_current_file'] = (os.path.realpath(self._queued_file.name),
441 self._queued_file.tell())
444 def _queue_file(self, source, filename=None):
446 src_path = os.path.realpath(source)
448 raise errors.AssertionError("{} not a file path".format(source))
450 path_stat = os.stat(src_path)
451 except OSError as stat_error:
453 super(ResumableCollectionWriter, self)._queue_file(source, filename)
454 fd_stat = os.fstat(self._queued_file.fileno())
455 if not S_ISREG(fd_stat.st_mode):
456 # We won't be able to resume from this cache anyway, so don't
457 # worry about further checks.
458 self._dependencies[source] = tuple(fd_stat)
459 elif path_stat is None:
460 raise errors.AssertionError(
461 "could not stat {}: {}".format(source, stat_error))
462 elif path_stat.st_ino != fd_stat.st_ino:
463 raise errors.AssertionError(
464 "{} changed between open and stat calls".format(source))
466 self._dependencies[src_path] = tuple(fd_stat)
468 def write(self, data):
469 if self._queued_file is None:
470 raise errors.AssertionError(
471 "resumable writer can't accept unsourced data")
472 return super(ResumableCollectionWriter, self).write(data)
480 COLLECTION = "collection"
482 class RichCollectionBase(CollectionBase):
483 """Base class for Collections and Subcollections.
485 Implements the majority of functionality relating to accessing items in the
490 def __init__(self, parent=None):
492 self._committed = False
493 self._callback = None
497 raise NotImplementedError()
500 raise NotImplementedError()
502 def _my_block_manager(self):
503 raise NotImplementedError()
506 raise NotImplementedError()
508 def root_collection(self):
509 raise NotImplementedError()
511 def notify(self, event, collection, name, item):
512 raise NotImplementedError()
514 def stream_name(self):
515 raise NotImplementedError()
519 def find_or_create(self, path, create_type):
520 """Recursively search the specified file path.
522 May return either a `Collection` or `ArvadosFile`. If not found, will
523 create a new item at the specified path based on `create_type`. Will
524 create intermediate subcollections needed to contain the final item in
528 One of `arvados.collection.FILE` or
529 `arvados.collection.COLLECTION`. If the path is not found, and value
530 of create_type is FILE then create and return a new ArvadosFile for
531 the last path component. If COLLECTION, then create and return a new
532 Collection for the last path component.
536 pathcomponents = path.split("/", 1)
537 if pathcomponents[0]:
538 item = self._items.get(pathcomponents[0])
539 if len(pathcomponents) == 1:
542 if create_type == COLLECTION:
543 item = Subcollection(self, pathcomponents[0])
545 item = ArvadosFile(self, pathcomponents[0])
546 self._items[pathcomponents[0]] = item
547 self.set_committed(False)
548 self.notify(ADD, self, pathcomponents[0], item)
552 # create new collection
553 item = Subcollection(self, pathcomponents[0])
554 self._items[pathcomponents[0]] = item
555 self.set_committed(False)
556 self.notify(ADD, self, pathcomponents[0], item)
557 if isinstance(item, RichCollectionBase):
558 return item.find_or_create(pathcomponents[1], create_type)
560 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
565 def find(self, path):
566 """Recursively search the specified file path.
568 May return either a Collection or ArvadosFile. Return None if not
570 If path is invalid (ex: starts with '/'), an IOError exception will be
575 raise errors.ArgumentError("Parameter 'path' is empty.")
577 pathcomponents = path.split("/", 1)
578 if pathcomponents[0] == '':
579 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
581 item = self._items.get(pathcomponents[0])
584 elif len(pathcomponents) == 1:
587 if isinstance(item, RichCollectionBase):
588 if pathcomponents[1]:
589 return item.find(pathcomponents[1])
593 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
596 def mkdirs(self, path):
597 """Recursive subcollection create.
599 Like `os.makedirs()`. Will create intermediate subcollections needed
600 to contain the leaf subcollection path.
604 if self.find(path) != None:
605 raise IOError(errno.EEXIST, "Directory or file exists", path)
607 return self.find_or_create(path, COLLECTION)
609 def open(self, path, mode="r"):
610 """Open a file-like object for access.
613 path to a file in the collection
615 one of "r", "r+", "w", "w+", "a", "a+"
619 opens for reading and writing. Reads/writes share a file pointer.
621 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
623 opens for reading and writing. All writes are appended to
624 the end of the file. Writing does not affect the file pointer for
627 mode = mode.replace("b", "")
628 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629 raise errors.ArgumentError("Bad mode '%s'" % mode)
630 create = (mode != "r")
632 if create and not self.writable():
633 raise IOError(errno.EROFS, "Collection is read only")
636 arvfile = self.find_or_create(path, FILE)
638 arvfile = self.find(path)
641 raise IOError(errno.ENOENT, "File not found", path)
642 if not isinstance(arvfile, ArvadosFile):
643 raise IOError(errno.EISDIR, "Is a directory", path)
648 name = os.path.basename(path)
651 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
653 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
656 """Determine if the collection has been modified since last commited."""
657 return not self.committed()
661 """Determine if the collection has been committed to the API server."""
662 return self._committed
665 def set_committed(self, value=True):
666 """Recursively set committed flag.
668 If value is True, set committed to be True for this and all children.
670 If value is False, set committed to be False for this and all parents.
672 if value == self._committed:
675 for k,v in self._items.items():
676 v.set_committed(True)
677 self._committed = True
679 self._committed = False
680 if self.parent is not None:
681 self.parent.set_committed(False)
685 """Iterate over names of files and collections contained in this collection."""
686 return iter(self._items.keys())
689 def __getitem__(self, k):
690 """Get a file or collection that is directly contained by this collection.
692 If you want to search a path, use `find()` instead.
695 return self._items[k]
698 def __contains__(self, k):
699 """Test if there is a file or collection a directly contained by this collection."""
700 return k in self._items
704 """Get the number of items directly contained in this collection."""
705 return len(self._items)
709 def __delitem__(self, p):
710 """Delete an item by name which is directly contained by this collection."""
712 self.set_committed(False)
713 self.notify(DEL, self, p, None)
717 """Get a list of names of files and collections directly contained in this collection."""
718 return self._items.keys()
722 """Get a list of files and collection objects directly contained in this collection."""
723 return self._items.values()
727 """Get a list of (name, object) tuples directly contained in this collection."""
728 return self._items.items()
730 def exists(self, path):
731 """Test if there is a file or collection at `path`."""
732 return self.find(path) is not None
736 def remove(self, path, recursive=False):
737 """Remove the file or subcollection (directory) at `path`.
740 Specify whether to remove non-empty subcollections (True), or raise an error (False).
744 raise errors.ArgumentError("Parameter 'path' is empty.")
746 pathcomponents = path.split("/", 1)
747 item = self._items.get(pathcomponents[0])
749 raise IOError(errno.ENOENT, "File not found", path)
750 if len(pathcomponents) == 1:
751 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
752 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
753 deleteditem = self._items[pathcomponents[0]]
754 del self._items[pathcomponents[0]]
755 self.set_committed(False)
756 self.notify(DEL, self, pathcomponents[0], deleteditem)
758 item.remove(pathcomponents[1])
760 def _clonefrom(self, source):
761 for k,v in source.items():
762 self._items[k] = v.clone(self, k)
765 raise NotImplementedError()
769 def add(self, source_obj, target_name, overwrite=False, reparent=False):
770 """Copy or move a file or subcollection to this collection.
773 An ArvadosFile, or Subcollection object
776 Destination item name. If the target name already exists and is a
777 file, this will raise an error unless you specify `overwrite=True`.
780 Whether to overwrite target file if it already exists.
783 If True, source_obj will be moved from its parent collection to this collection.
784 If False, source_obj will be copied and the parent collection will be
789 if target_name in self and not overwrite:
790 raise IOError(errno.EEXIST, "File already exists", target_name)
793 if target_name in self:
794 modified_from = self[target_name]
796 # Actually make the move or copy.
798 source_obj._reparent(self, target_name)
801 item = source_obj.clone(self, target_name)
803 self._items[target_name] = item
804 self.set_committed(False)
807 self.notify(MOD, self, target_name, (modified_from, item))
809 self.notify(ADD, self, target_name, item)
811 def _get_src_target(self, source, target_path, source_collection, create_dest):
812 if source_collection is None:
813 source_collection = self
816 if isinstance(source, basestring):
817 source_obj = source_collection.find(source)
818 if source_obj is None:
819 raise IOError(errno.ENOENT, "File not found", source)
820 sourcecomponents = source.split("/")
823 sourcecomponents = None
825 # Find parent collection the target path
826 targetcomponents = target_path.split("/")
828 # Determine the name to use.
829 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
832 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
835 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
837 if len(targetcomponents) > 1:
838 target_dir = self.find("/".join(targetcomponents[0:-1]))
842 if target_dir is None:
843 raise IOError(errno.ENOENT, "Target directory not found", target_name)
845 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
846 target_dir = target_dir[target_name]
847 target_name = sourcecomponents[-1]
849 return (source_obj, target_dir, target_name)
853 def copy(self, source, target_path, source_collection=None, overwrite=False):
854 """Copy a file or subcollection to a new path in this collection.
857 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
860 Destination file or path. If the target path already exists and is a
861 subcollection, the item will be placed inside the subcollection. If
862 the target path already exists and is a file, this will raise an error
863 unless you specify `overwrite=True`.
866 Collection to copy `source_path` from (default `self`)
869 Whether to overwrite target file if it already exists.
872 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
873 target_dir.add(source_obj, target_name, overwrite, False)
877 def rename(self, source, target_path, source_collection=None, overwrite=False):
878 """Move a file or subcollection from `source_collection` to a new path in this collection.
881 A string with a path to source file or subcollection.
884 Destination file or path. If the target path already exists and is a
885 subcollection, the item will be placed inside the subcollection. If
886 the target path already exists and is a file, this will raise an error
887 unless you specify `overwrite=True`.
890 Collection to copy `source_path` from (default `self`)
893 Whether to overwrite target file if it already exists.
896 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
897 if not source_obj.writable():
898 raise IOError(errno.EROFS, "Source collection is read only", source)
899 target_dir.add(source_obj, target_name, overwrite, True)
901 def portable_manifest_text(self, stream_name="."):
902 """Get the manifest text for this collection, sub collections and files.
904 This method does not flush outstanding blocks to Keep. It will return
905 a normalized manifest with access tokens stripped.
908 Name to use for this stream (directory)
911 return self._get_manifest_text(stream_name, True, True)
914 def manifest_text(self, stream_name=".", strip=False, normalize=False,
915 only_committed=False):
916 """Get the manifest text for this collection, sub collections and files.
918 This method will flush outstanding blocks to Keep. By default, it will
919 not normalize an unmodified manifest or strip access tokens.
922 Name to use for this stream (directory)
925 If True, remove signing tokens from block locators if present.
926 If False (default), block locators are left unchanged.
929 If True, always export the manifest text in normalized form
930 even if the Collection is not modified. If False (default) and the collection
931 is not modified, return the original manifest text even if it is not
935 If True, don't commit pending blocks.
939 if not only_committed:
940 self._my_block_manager().commit_all()
941 return self._get_manifest_text(stream_name, strip, normalize,
942 only_committed=only_committed)
945 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
946 """Get the manifest text for this collection, sub collections and files.
949 Name to use for this stream (directory)
952 If True, remove signing tokens from block locators if present.
953 If False (default), block locators are left unchanged.
956 If True, always export the manifest text in normalized form
957 even if the Collection is not modified. If False (default) and the collection
958 is not modified, return the original manifest text even if it is not
962 If True, only include blocks that were already committed to Keep.
966 if not self.committed() or self._manifest_text is None or normalize:
969 sorted_keys = sorted(self.keys())
970 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
971 # Create a stream per file `k`
972 arvfile = self[filename]
974 for segment in arvfile.segments():
975 loc = segment.locator
976 if arvfile.parent._my_block_manager().is_bufferblock(loc):
979 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
981 loc = KeepLocator(loc).stripped()
982 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
983 segment.segment_offset, segment.range_size))
984 stream[filename] = filestream
986 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
987 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
988 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
992 return self.stripped_manifest()
994 return self._manifest_text
997 def diff(self, end_collection, prefix=".", holding_collection=None):
998 """Generate list of add/modify/delete actions.
1000 When given to `apply`, will change `self` to match `end_collection`
1004 if holding_collection is None:
1005 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1007 if k not in end_collection:
1008 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1009 for k in end_collection:
1011 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1012 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1013 elif end_collection[k] != self[k]:
1014 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1016 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1018 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1023 def apply(self, changes):
1024 """Apply changes from `diff`.
1026 If a change conflicts with a local change, it will be saved to an
1027 alternate path indicating the conflict.
1031 self.set_committed(False)
1032 for change in changes:
1033 event_type = change[0]
1036 local = self.find(path)
1037 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1039 if event_type == ADD:
1041 # No local file at path, safe to copy over new file
1042 self.copy(initial, path)
1043 elif local is not None and local != initial:
1044 # There is already local file and it is different:
1045 # save change to conflict file.
1046 self.copy(initial, conflictpath)
1047 elif event_type == MOD or event_type == TOK:
1049 if local == initial:
1050 # Local matches the "initial" item so it has not
1051 # changed locally and is safe to update.
1052 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1053 # Replace contents of local file with new contents
1054 local.replace_contents(final)
1056 # Overwrite path with new item; this can happen if
1057 # path was a file and is now a collection or vice versa
1058 self.copy(final, path, overwrite=True)
1060 # Local is missing (presumably deleted) or local doesn't
1061 # match the "start" value, so save change to conflict file
1062 self.copy(final, conflictpath)
1063 elif event_type == DEL:
1064 if local == initial:
1065 # Local item matches "initial" value, so it is safe to remove.
1066 self.remove(path, recursive=True)
1067 # else, the file is modified or already removed, in either
1068 # case we don't want to try to remove it.
1070 def portable_data_hash(self):
1071 """Get the portable data hash for this collection's manifest."""
1072 if self._manifest_locator and self.committed():
1073 # If the collection is already saved on the API server, and it's committed
1074 # then return API server's PDH response.
1075 return self._portable_data_hash
1077 stripped = self.portable_manifest_text()
1078 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1081 def subscribe(self, callback):
1082 if self._callback is None:
1083 self._callback = callback
1085 raise errors.ArgumentError("A callback is already set on this collection.")
1088 def unsubscribe(self):
1089 if self._callback is not None:
1090 self._callback = None
1093 def notify(self, event, collection, name, item):
1095 self._callback(event, collection, name, item)
1096 self.root_collection().notify(event, collection, name, item)
1099 def __eq__(self, other):
1102 if not isinstance(other, RichCollectionBase):
1104 if len(self._items) != len(other):
1106 for k in self._items:
1109 if self._items[k] != other[k]:
1113 def __ne__(self, other):
1114 return not self.__eq__(other)
1118 """Flush bufferblocks to Keep."""
1119 for e in self.values():
1123 class Collection(RichCollectionBase):
1124 """Represents the root of an Arvados Collection.
1126 This class is threadsafe. The root collection object, all subcollections
1127 and files are protected by a single lock (i.e. each access locks the entire
1133 :To read an existing file:
1134 `c.open("myfile", "r")`
1136 :To write a new file:
1137 `c.open("myfile", "w")`
1139 :To determine if a file exists:
1140 `c.find("myfile") is not None`
1143 `c.copy("source", "dest")`
1146 `c.remove("myfile")`
1148 :To save to an existing collection record:
1151 :To save a new collection record:
1154 :To merge remote changes into this object:
1157 Must be associated with an API server Collection record (during
1158 initialization, or using `save_new`) to use `save` or `update`
1162 def __init__(self, manifest_locator_or_text=None,
1169 replication_desired=None,
1171 """Collection constructor.
1173 :manifest_locator_or_text:
1174 One of Arvados collection UUID, block locator of
1175 a manifest, raw manifest text, or None (to create an empty collection).
1177 the parent Collection, may be None.
1180 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1181 Prefer this over supplying your own api_client and keep_client (except in testing).
1182 Will use default config settings if not specified.
1185 The API client object to use for requests. If not specified, create one using `apiconfig`.
1188 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1191 the number of retries for API and Keep requests.
1194 the block manager to use. If not specified, create one.
1196 :replication_desired:
1197 How many copies should Arvados maintain. If None, API server default
1198 configuration applies. If not None, this value will also be used
1199 for determining the number of block copies being written.
1202 super(Collection, self).__init__(parent)
1203 self._api_client = api_client
1204 self._keep_client = keep_client
1205 self._block_manager = block_manager
1206 self.replication_desired = replication_desired
1207 self.put_threads = put_threads
1210 self._config = apiconfig
1212 self._config = config.settings()
1214 self.num_retries = num_retries if num_retries is not None else 0
1215 self._manifest_locator = None
1216 self._manifest_text = None
1217 self._portable_data_hash = None
1218 self._api_response = None
1219 self._past_versions = set()
1221 self.lock = threading.RLock()
1224 if manifest_locator_or_text:
1225 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1226 self._manifest_locator = manifest_locator_or_text
1227 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1228 self._manifest_locator = manifest_locator_or_text
1229 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1230 self._manifest_text = manifest_locator_or_text
1232 raise errors.ArgumentError(
1233 "Argument to CollectionReader is not a manifest or a collection UUID")
1237 except (IOError, errors.SyntaxError) as e:
1238 raise errors.ArgumentError("Error processing manifest text: %s", e)
1240 def root_collection(self):
1243 def stream_name(self):
1250 def known_past_version(self, modified_at_and_portable_data_hash):
1251 return modified_at_and_portable_data_hash in self._past_versions
1255 def update(self, other=None, num_retries=None):
1256 """Merge the latest collection on the API server with the current collection."""
1259 if self._manifest_locator is None:
1260 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1261 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1262 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1263 response.get("portable_data_hash") != self.portable_data_hash()):
1264 # The record on the server is different from our current one, but we've seen it before,
1265 # so ignore it because it's already been merged.
1266 # However, if it's the same as our current record, proceed with the update, because we want to update
1270 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1271 other = CollectionReader(response["manifest_text"])
1272 baseline = CollectionReader(self._manifest_text)
1273 self.apply(baseline.diff(other))
1274 self._manifest_text = self.manifest_text()
1278 if self._api_client is None:
1279 self._api_client = ThreadSafeApiCache(self._config)
1280 if self._keep_client is None:
1281 self._keep_client = self._api_client.keep
1282 return self._api_client
1286 if self._keep_client is None:
1287 if self._api_client is None:
1290 self._keep_client = KeepClient(api_client=self._api_client)
1291 return self._keep_client
1294 def _my_block_manager(self):
1295 if self._block_manager is None:
1296 copies = (self.replication_desired or
1297 self._my_api()._rootDesc.get('defaultCollectionReplication',
1299 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1300 return self._block_manager
1302 def _remember_api_response(self, response):
1303 self._api_response = response
1304 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1306 def _populate_from_api_server(self):
1307 # As in KeepClient itself, we must wait until the last
1308 # possible moment to instantiate an API client, in order to
1309 # avoid tripping up clients that don't have access to an API
1310 # server. If we do build one, make sure our Keep client uses
1311 # it. If instantiation fails, we'll fall back to the except
1312 # clause, just like any other Collection lookup
1313 # failure. Return an exception, or None if successful.
1315 self._remember_api_response(self._my_api().collections().get(
1316 uuid=self._manifest_locator).execute(
1317 num_retries=self.num_retries))
1318 self._manifest_text = self._api_response['manifest_text']
1319 # If not overriden via kwargs, we should try to load the
1320 # replication_desired from the API server
1321 if self.replication_desired is None:
1322 self.replication_desired = self._api_response.get('replication_desired', None)
1324 except Exception as e:
1327 def _populate_from_keep(self):
1328 # Retrieve a manifest directly from Keep. This has a chance of
1329 # working if [a] the locator includes a permission signature
1330 # or [b] the Keep services are operating in world-readable
1331 # mode. Return an exception, or None if successful.
1333 self._manifest_text = self._my_keep().get(
1334 self._manifest_locator, num_retries=self.num_retries)
1335 except Exception as e:
1338 def _populate(self):
1339 if self._manifest_locator is None and self._manifest_text is None:
1341 error_via_api = None
1342 error_via_keep = None
1343 should_try_keep = ((self._manifest_text is None) and
1344 util.keep_locator_pattern.match(
1345 self._manifest_locator))
1346 if ((self._manifest_text is None) and
1347 util.signed_locator_pattern.match(self._manifest_locator)):
1348 error_via_keep = self._populate_from_keep()
1349 if self._manifest_text is None:
1350 error_via_api = self._populate_from_api_server()
1351 if error_via_api is not None and not should_try_keep:
1353 if ((self._manifest_text is None) and
1354 not error_via_keep and
1356 # Looks like a keep locator, and we didn't already try keep above
1357 error_via_keep = self._populate_from_keep()
1358 if self._manifest_text is None:
1360 raise errors.NotFoundError(
1361 ("Failed to retrieve collection '{}' " +
1362 "from either API server ({}) or Keep ({})."
1364 self._manifest_locator,
1368 self._baseline_manifest = self._manifest_text
1369 self._import_manifest(self._manifest_text)
1372 def _has_collection_uuid(self):
1373 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1375 def __enter__(self):
1378 def __exit__(self, exc_type, exc_value, traceback):
1379 """Support scoped auto-commit in a with: block."""
1380 if exc_type is None:
1381 if self.writable() and self._has_collection_uuid():
1385 def stop_threads(self):
1386 if self._block_manager is not None:
1387 self._block_manager.stop_threads()
1390 def manifest_locator(self):
1391 """Get the manifest locator, if any.
1393 The manifest locator will be set when the collection is loaded from an
1394 API server record or the portable data hash of a manifest.
1396 The manifest locator will be None if the collection is newly created or
1397 was created directly from manifest text. The method `save_new()` will
1398 assign a manifest locator.
1401 return self._manifest_locator
1404 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1405 if new_config is None:
1406 new_config = self._config
1408 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1410 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1412 newcollection._clonefrom(self)
1413 return newcollection
1416 def api_response(self):
1417 """Returns information about this Collection fetched from the API server.
1419 If the Collection exists in Keep but not the API server, currently
1420 returns None. Future versions may provide a synthetic response.
1423 return self._api_response
1425 def find_or_create(self, path, create_type):
1426 """See `RichCollectionBase.find_or_create`"""
1430 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1432 def find(self, path):
1433 """See `RichCollectionBase.find`"""
1437 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1439 def remove(self, path, recursive=False):
1440 """See `RichCollectionBase.remove`"""
1442 raise errors.ArgumentError("Cannot remove '.'")
1444 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1449 def save(self, merge=True, num_retries=None):
1450 """Save collection to an existing collection record.
1452 Commit pending buffer blocks to Keep, merge with remote record (if
1453 merge=True, the default), and update the collection record. Returns
1454 the current manifest text.
1456 Will raise AssertionError if not associated with a collection record on
1457 the API server. If you want to save a manifest to Keep only, see
1461 Update and merge remote changes before saving. Otherwise, any
1462 remote changes will be ignored and overwritten.
1465 Retry count on API calls (if None, use the collection default)
1468 if not self.committed():
1469 if not self._has_collection_uuid():
1470 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1472 self._my_block_manager().commit_all()
1477 text = self.manifest_text(strip=False)
1478 self._remember_api_response(self._my_api().collections().update(
1479 uuid=self._manifest_locator,
1480 body={'manifest_text': text}
1482 num_retries=num_retries))
1483 self._manifest_text = self._api_response["manifest_text"]
1484 self._portable_data_hash = self._api_response["portable_data_hash"]
1485 self.set_committed(True)
1487 return self._manifest_text
1493 def save_new(self, name=None,
1494 create_collection_record=True,
1496 ensure_unique_name=False,
1498 """Save collection to a new collection record.
1500 Commit pending buffer blocks to Keep and, when create_collection_record
1501 is True (default), create a new collection record. After creating a
1502 new collection record, this Collection object will be associated with
1503 the new record used by `save()`. Returns the current manifest text.
1506 The collection name.
1508 :create_collection_record:
1509 If True, create a collection record on the API server.
1510 If False, only commit blocks to Keep and return the manifest text.
1513 the user, or project uuid that will own this collection.
1514 If None, defaults to the current user.
1516 :ensure_unique_name:
1517 If True, ask the API server to rename the collection
1518 if it conflicts with a collection with the same name and owner. If
1519 False, a name conflict will result in an error.
1522 Retry count on API calls (if None, use the collection default)
1525 self._my_block_manager().commit_all()
1526 text = self.manifest_text(strip=False)
1528 if create_collection_record:
1530 name = "New collection"
1531 ensure_unique_name = True
1533 body = {"manifest_text": text,
1535 "replication_desired": self.replication_desired}
1537 body["owner_uuid"] = owner_uuid
1539 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1540 text = self._api_response["manifest_text"]
1542 self._manifest_locator = self._api_response["uuid"]
1543 self._portable_data_hash = self._api_response["portable_data_hash"]
1545 self._manifest_text = text
1546 self.set_committed(True)
1551 def _import_manifest(self, manifest_text):
1552 """Import a manifest into a `Collection`.
1555 The manifest text to import from.
1559 raise ArgumentError("Can only import manifest into an empty collection")
1568 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1569 tok = token_and_separator.group(1)
1570 sep = token_and_separator.group(2)
1572 if state == STREAM_NAME:
1573 # starting a new stream
1574 stream_name = tok.replace('\\040', ' ')
1579 self.find_or_create(stream_name, COLLECTION)
1583 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1585 blocksize = long(block_locator.group(1))
1586 blocks.append(Range(tok, streamoffset, blocksize, 0))
1587 streamoffset += blocksize
1591 if state == SEGMENTS:
1592 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1594 pos = long(file_segment.group(1))
1595 size = long(file_segment.group(2))
1596 name = file_segment.group(3).replace('\\040', ' ')
1597 filepath = os.path.join(stream_name, name)
1598 afile = self.find_or_create(filepath, FILE)
1599 if isinstance(afile, ArvadosFile):
1600 afile.add_segment(blocks, pos, size)
1602 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1605 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1611 self.set_committed(True)
1614 def notify(self, event, collection, name, item):
1616 self._callback(event, collection, name, item)
1619 class Subcollection(RichCollectionBase):
1620 """This is a subdirectory within a collection that doesn't have its own API
1623 Subcollection locking falls under the umbrella lock of its root collection.
1627 def __init__(self, parent, name):
1628 super(Subcollection, self).__init__(parent)
1629 self.lock = self.root_collection().lock
1630 self._manifest_text = None
1632 self.num_retries = parent.num_retries
1634 def root_collection(self):
1635 return self.parent.root_collection()
1638 return self.root_collection().writable()
1641 return self.root_collection()._my_api()
1644 return self.root_collection()._my_keep()
1646 def _my_block_manager(self):
1647 return self.root_collection()._my_block_manager()
1649 def stream_name(self):
1650 return os.path.join(self.parent.stream_name(), self.name)
1653 def clone(self, new_parent, new_name):
1654 c = Subcollection(new_parent, new_name)
1660 def _reparent(self, newparent, newname):
1661 self.set_committed(False)
1663 self.parent.remove(self.name, recursive=True)
1664 self.parent = newparent
1666 self.lock = self.parent.root_collection().lock
1669 class CollectionReader(Collection):
1670 """A read-only collection object.
1672 Initialize from an api collection record locator, a portable data hash of a
1673 manifest, or raw manifest text. See `Collection` constructor for detailed
1677 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1678 self._in_init = True
1679 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1680 self._in_init = False
1682 # Forego any locking since it should never change once initialized.
1683 self.lock = NoopLock()
1685 # Backwards compatability with old CollectionReader
1686 # all_streams() and all_files()
1687 self._streams = None
1690 return self._in_init
1692 def _populate_streams(orig_func):
1693 @functools.wraps(orig_func)
1694 def populate_streams_wrapper(self, *args, **kwargs):
1695 # Defer populating self._streams until needed since it creates a copy of the manifest.
1696 if self._streams is None:
1697 if self._manifest_text:
1698 self._streams = [sline.split()
1699 for sline in self._manifest_text.split("\n")
1703 return orig_func(self, *args, **kwargs)
1704 return populate_streams_wrapper
1707 def normalize(self):
1708 """Normalize the streams returned by `all_streams`.
1710 This method is kept for backwards compatability and only affects the
1711 behavior of `all_streams()` and `all_files()`
1717 for s in self.all_streams():
1718 for f in s.all_files():
1719 streamname, filename = split(s.name() + "/" + f.name())
1720 if streamname not in streams:
1721 streams[streamname] = {}
1722 if filename not in streams[streamname]:
1723 streams[streamname][filename] = []
1724 for r in f.segments:
1725 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1727 self._streams = [normalize_stream(s, streams[s])
1728 for s in sorted(streams)]
1730 def all_streams(self):
1731 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1732 for s in self._streams]
1735 def all_files(self):
1736 for s in self.all_streams():
1737 for f in s.all_files():