1 from __future__ import absolute_import
2 from builtins import str
3 from past.builtins import basestring
4 from builtins import object
14 from collections import deque
17 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
18 from .keep import KeepLocator, KeepClient
19 from .stream import StreamReader
20 from ._normalize_stream import normalize_stream
21 from ._ranges import Range, LocatorAndRange
22 from .safeapi import ThreadSafeApiCache
23 import arvados.config as config
24 import arvados.errors as errors
26 import arvados.events as events
27 from arvados.retry import retry_method
29 _logger = logging.getLogger('arvados.collection')
31 class CollectionBase(object):
35 def __exit__(self, exc_type, exc_value, traceback):
39 if self._keep_client is None:
40 self._keep_client = KeepClient(api_client=self._api_client,
41 num_retries=self.num_retries)
42 return self._keep_client
44 def stripped_manifest(self):
45 """Get the manifest with locator hints stripped.
47 Return the manifest for the current collection with all
48 non-portable hints (i.e., permission signatures and other
49 hints other than size hints) removed from the locators.
51 raw = self.manifest_text()
53 for line in raw.split("\n"):
56 clean_fields = fields[:1] + [
57 (re.sub(r'\+[^\d][^\+]*', '', x)
58 if re.match(arvados.util.keep_locator_pattern, x)
61 clean += [' '.join(clean_fields), "\n"]
65 class _WriterFile(_FileLikeObjectBase):
66 def __init__(self, coll_writer, name):
67 super(_WriterFile, self).__init__(name, 'wb')
68 self.dest = coll_writer
71 super(_WriterFile, self).close()
72 self.dest.finish_current_file()
74 @_FileLikeObjectBase._before_close
75 def write(self, data):
78 @_FileLikeObjectBase._before_close
79 def writelines(self, seq):
83 @_FileLikeObjectBase._before_close
85 self.dest.flush_data()
88 class CollectionWriter(CollectionBase):
89 def __init__(self, api_client=None, num_retries=0, replication=None):
90 """Instantiate a CollectionWriter.
92 CollectionWriter lets you build a new Arvados Collection from scratch.
93 Write files to it. The CollectionWriter will upload data to Keep as
94 appropriate, and provide you with the Collection manifest text when
98 * api_client: The API client to use to look up Collections. If not
99 provided, CollectionReader will build one from available Arvados
101 * num_retries: The default number of times to retry failed
102 service requests. Default 0. You may change this value
103 after instantiation, but note those changes may not
104 propagate to related objects like the Keep client.
105 * replication: The number of copies of each block to store.
106 If this argument is None or not supplied, replication is
107 the server-provided default if available, otherwise 2.
109 self._api_client = api_client
110 self.num_retries = num_retries
111 self.replication = (2 if replication is None else replication)
112 self._keep_client = None
113 self._data_buffer = []
114 self._data_buffer_len = 0
115 self._current_stream_files = []
116 self._current_stream_length = 0
117 self._current_stream_locators = []
118 self._current_stream_name = '.'
119 self._current_file_name = None
120 self._current_file_pos = 0
121 self._finished_streams = []
122 self._close_file = None
123 self._queued_file = None
124 self._queued_dirents = deque()
125 self._queued_trees = deque()
126 self._last_open = None
128 def __exit__(self, exc_type, exc_value, traceback):
132 def do_queued_work(self):
133 # The work queue consists of three pieces:
134 # * _queued_file: The file object we're currently writing to the
136 # * _queued_dirents: Entries under the current directory
137 # (_queued_trees[0]) that we want to write or recurse through.
138 # This may contain files from subdirectories if
139 # max_manifest_depth == 0 for this directory.
140 # * _queued_trees: Directories that should be written as separate
141 # streams to the Collection.
142 # This function handles the smallest piece of work currently queued
143 # (current file, then current directory, then next directory) until
144 # no work remains. The _work_THING methods each do a unit of work on
145 # THING. _queue_THING methods add a THING to the work queue.
147 if self._queued_file:
149 elif self._queued_dirents:
151 elif self._queued_trees:
156 def _work_file(self):
158 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
162 self.finish_current_file()
164 self._queued_file.close()
165 self._close_file = None
166 self._queued_file = None
168 def _work_dirents(self):
169 path, stream_name, max_manifest_depth = self._queued_trees[0]
170 if stream_name != self.current_stream_name():
171 self.start_new_stream(stream_name)
172 while self._queued_dirents:
173 dirent = self._queued_dirents.popleft()
174 target = os.path.join(path, dirent)
175 if os.path.isdir(target):
176 self._queue_tree(target,
177 os.path.join(stream_name, dirent),
178 max_manifest_depth - 1)
180 self._queue_file(target, dirent)
182 if not self._queued_dirents:
183 self._queued_trees.popleft()
185 def _work_trees(self):
186 path, stream_name, max_manifest_depth = self._queued_trees[0]
187 d = arvados.util.listdir_recursive(
188 path, max_depth = (None if max_manifest_depth == 0 else 0))
190 self._queue_dirents(stream_name, d)
192 self._queued_trees.popleft()
194 def _queue_file(self, source, filename=None):
195 assert (self._queued_file is None), "tried to queue more than one file"
196 if not hasattr(source, 'read'):
197 source = open(source, 'rb')
198 self._close_file = True
200 self._close_file = False
202 filename = os.path.basename(source.name)
203 self.start_new_file(filename)
204 self._queued_file = source
206 def _queue_dirents(self, stream_name, dirents):
207 assert (not self._queued_dirents), "tried to queue more than one tree"
208 self._queued_dirents = deque(sorted(dirents))
210 def _queue_tree(self, path, stream_name, max_manifest_depth):
211 self._queued_trees.append((path, stream_name, max_manifest_depth))
213 def write_file(self, source, filename=None):
214 self._queue_file(source, filename)
215 self.do_queued_work()
217 def write_directory_tree(self,
218 path, stream_name='.', max_manifest_depth=-1):
219 self._queue_tree(path, stream_name, max_manifest_depth)
220 self.do_queued_work()
222 def write(self, newdata):
223 if hasattr(newdata, '__iter__'):
227 self._data_buffer.append(newdata)
228 self._data_buffer_len += len(newdata)
229 self._current_stream_length += len(newdata)
230 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
233 def open(self, streampath, filename=None):
234 """open(streampath[, filename]) -> file-like object
236 Pass in the path of a file to write to the Collection, either as a
237 single string or as two separate stream name and file name arguments.
238 This method returns a file-like object you can write to add it to the
241 You may only have one file object from the Collection open at a time,
242 so be sure to close the object when you're done. Using the object in
243 a with statement makes that easy::
245 with cwriter.open('./doc/page1.txt') as outfile:
246 outfile.write(page1_data)
247 with cwriter.open('./doc/page2.txt') as outfile:
248 outfile.write(page2_data)
251 streampath, filename = split(streampath)
252 if self._last_open and not self._last_open.closed:
253 raise errors.AssertionError(
254 "can't open '{}' when '{}' is still open".format(
255 filename, self._last_open.name))
256 if streampath != self.current_stream_name():
257 self.start_new_stream(streampath)
258 self.set_current_file_name(filename)
259 self._last_open = _WriterFile(self, filename)
260 return self._last_open
262 def flush_data(self):
263 data_buffer = ''.join(self._data_buffer)
265 self._current_stream_locators.append(
267 data_buffer[0:config.KEEP_BLOCK_SIZE],
268 copies=self.replication))
269 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
270 self._data_buffer_len = len(self._data_buffer[0])
272 def start_new_file(self, newfilename=None):
273 self.finish_current_file()
274 self.set_current_file_name(newfilename)
276 def set_current_file_name(self, newfilename):
277 if re.search(r'[\t\n]', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain whitespace: %s" %
281 elif re.search(r'\x00', newfilename):
282 raise errors.AssertionError(
283 "Manifest filenames cannot contain NUL characters: %s" %
285 self._current_file_name = newfilename
287 def current_file_name(self):
288 return self._current_file_name
290 def finish_current_file(self):
291 if self._current_file_name is None:
292 if self._current_file_pos == self._current_stream_length:
294 raise errors.AssertionError(
295 "Cannot finish an unnamed file " +
296 "(%d bytes at offset %d in '%s' stream)" %
297 (self._current_stream_length - self._current_file_pos,
298 self._current_file_pos,
299 self._current_stream_name))
300 self._current_stream_files.append([
301 self._current_file_pos,
302 self._current_stream_length - self._current_file_pos,
303 self._current_file_name])
304 self._current_file_pos = self._current_stream_length
305 self._current_file_name = None
307 def start_new_stream(self, newstreamname='.'):
308 self.finish_current_stream()
309 self.set_current_stream_name(newstreamname)
311 def set_current_stream_name(self, newstreamname):
312 if re.search(r'[\t\n]', newstreamname):
313 raise errors.AssertionError(
314 "Manifest stream names cannot contain whitespace: '%s'" %
316 self._current_stream_name = '.' if newstreamname=='' else newstreamname
318 def current_stream_name(self):
319 return self._current_stream_name
321 def finish_current_stream(self):
322 self.finish_current_file()
324 if not self._current_stream_files:
326 elif self._current_stream_name is None:
327 raise errors.AssertionError(
328 "Cannot finish an unnamed stream (%d bytes in %d files)" %
329 (self._current_stream_length, len(self._current_stream_files)))
331 if not self._current_stream_locators:
332 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
333 self._finished_streams.append([self._current_stream_name,
334 self._current_stream_locators,
335 self._current_stream_files])
336 self._current_stream_files = []
337 self._current_stream_length = 0
338 self._current_stream_locators = []
339 self._current_stream_name = None
340 self._current_file_pos = 0
341 self._current_file_name = None
344 """Store the manifest in Keep and return its locator.
346 This is useful for storing manifest fragments (task outputs)
347 temporarily in Keep during a Crunch job.
349 In other cases you should make a collection instead, by
350 sending manifest_text() to the API server's "create
351 collection" endpoint.
353 return self._my_keep().put(self.manifest_text(), copies=self.replication)
355 def portable_data_hash(self):
356 stripped = self.stripped_manifest()
357 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
359 def manifest_text(self):
360 self.finish_current_stream()
363 for stream in self._finished_streams:
364 if not re.search(r'^\.(/.*)?$', stream[0]):
366 manifest += stream[0].replace(' ', '\\040')
367 manifest += ' ' + ' '.join(stream[1])
368 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
373 def data_locators(self):
375 for name, locators, files in self._finished_streams:
380 class ResumableCollectionWriter(CollectionWriter):
381 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
382 '_current_stream_locators', '_current_stream_name',
383 '_current_file_name', '_current_file_pos', '_close_file',
384 '_data_buffer', '_dependencies', '_finished_streams',
385 '_queued_dirents', '_queued_trees']
387 def __init__(self, api_client=None, **kwargs):
388 self._dependencies = {}
389 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
392 def from_state(cls, state, *init_args, **init_kwargs):
393 # Try to build a new writer from scratch with the given state.
394 # If the state is not suitable to resume (because files have changed,
395 # been deleted, aren't predictable, etc.), raise a
396 # StaleWriterStateError. Otherwise, return the initialized writer.
397 # The caller is responsible for calling writer.do_queued_work()
398 # appropriately after it's returned.
399 writer = cls(*init_args, **init_kwargs)
400 for attr_name in cls.STATE_PROPS:
401 attr_value = state[attr_name]
402 attr_class = getattr(writer, attr_name).__class__
403 # Coerce the value into the same type as the initial value, if
405 if attr_class not in (type(None), attr_value.__class__):
406 attr_value = attr_class(attr_value)
407 setattr(writer, attr_name, attr_value)
408 # Check dependencies before we try to resume anything.
409 if any(KeepLocator(ls).permission_expired()
410 for ls in writer._current_stream_locators):
411 raise errors.StaleWriterStateError(
412 "locators include expired permission hint")
413 writer.check_dependencies()
414 if state['_current_file'] is not None:
415 path, pos = state['_current_file']
417 writer._queued_file = open(path, 'rb')
418 writer._queued_file.seek(pos)
419 except IOError as error:
420 raise errors.StaleWriterStateError(
421 "failed to reopen active file {}: {}".format(path, error))
424 def check_dependencies(self):
425 for path, orig_stat in list(self._dependencies.items()):
426 if not S_ISREG(orig_stat[ST_MODE]):
427 raise errors.StaleWriterStateError("{} not file".format(path))
429 now_stat = tuple(os.stat(path))
430 except OSError as error:
431 raise errors.StaleWriterStateError(
432 "failed to stat {}: {}".format(path, error))
433 if ((not S_ISREG(now_stat[ST_MODE])) or
434 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
435 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
436 raise errors.StaleWriterStateError("{} changed".format(path))
438 def dump_state(self, copy_func=lambda x: x):
439 state = {attr: copy_func(getattr(self, attr))
440 for attr in self.STATE_PROPS}
441 if self._queued_file is None:
442 state['_current_file'] = None
444 state['_current_file'] = (os.path.realpath(self._queued_file.name),
445 self._queued_file.tell())
448 def _queue_file(self, source, filename=None):
450 src_path = os.path.realpath(source)
452 raise errors.AssertionError("{} not a file path".format(source))
454 path_stat = os.stat(src_path)
455 except OSError as stat_error:
457 super(ResumableCollectionWriter, self)._queue_file(source, filename)
458 fd_stat = os.fstat(self._queued_file.fileno())
459 if not S_ISREG(fd_stat.st_mode):
460 # We won't be able to resume from this cache anyway, so don't
461 # worry about further checks.
462 self._dependencies[source] = tuple(fd_stat)
463 elif path_stat is None:
464 raise errors.AssertionError(
465 "could not stat {}: {}".format(source, stat_error))
466 elif path_stat.st_ino != fd_stat.st_ino:
467 raise errors.AssertionError(
468 "{} changed between open and stat calls".format(source))
470 self._dependencies[src_path] = tuple(fd_stat)
472 def write(self, data):
473 if self._queued_file is None:
474 raise errors.AssertionError(
475 "resumable writer can't accept unsourced data")
476 return super(ResumableCollectionWriter, self).write(data)
484 COLLECTION = "collection"
486 class RichCollectionBase(CollectionBase):
487 """Base class for Collections and Subcollections.
489 Implements the majority of functionality relating to accessing items in the
494 def __init__(self, parent=None):
496 self._committed = False
497 self._callback = None
501 raise NotImplementedError()
504 raise NotImplementedError()
506 def _my_block_manager(self):
507 raise NotImplementedError()
510 raise NotImplementedError()
512 def root_collection(self):
513 raise NotImplementedError()
515 def notify(self, event, collection, name, item):
516 raise NotImplementedError()
518 def stream_name(self):
519 raise NotImplementedError()
523 def find_or_create(self, path, create_type):
524 """Recursively search the specified file path.
526 May return either a `Collection` or `ArvadosFile`. If not found, will
527 create a new item at the specified path based on `create_type`. Will
528 create intermediate subcollections needed to contain the final item in
532 One of `arvados.collection.FILE` or
533 `arvados.collection.COLLECTION`. If the path is not found, and value
534 of create_type is FILE then create and return a new ArvadosFile for
535 the last path component. If COLLECTION, then create and return a new
536 Collection for the last path component.
540 pathcomponents = path.split("/", 1)
541 if pathcomponents[0]:
542 item = self._items.get(pathcomponents[0])
543 if len(pathcomponents) == 1:
546 if create_type == COLLECTION:
547 item = Subcollection(self, pathcomponents[0])
549 item = ArvadosFile(self, pathcomponents[0])
550 self._items[pathcomponents[0]] = item
551 self.set_committed(False)
552 self.notify(ADD, self, pathcomponents[0], item)
556 # create new collection
557 item = Subcollection(self, pathcomponents[0])
558 self._items[pathcomponents[0]] = item
559 self.set_committed(False)
560 self.notify(ADD, self, pathcomponents[0], item)
561 if isinstance(item, RichCollectionBase):
562 return item.find_or_create(pathcomponents[1], create_type)
564 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
569 def find(self, path):
570 """Recursively search the specified file path.
572 May return either a Collection or ArvadosFile. Return None if not
574 If path is invalid (ex: starts with '/'), an IOError exception will be
579 raise errors.ArgumentError("Parameter 'path' is empty.")
581 pathcomponents = path.split("/", 1)
582 if pathcomponents[0] == '':
583 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
585 item = self._items.get(pathcomponents[0])
588 elif len(pathcomponents) == 1:
591 if isinstance(item, RichCollectionBase):
592 if pathcomponents[1]:
593 return item.find(pathcomponents[1])
597 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
600 def mkdirs(self, path):
601 """Recursive subcollection create.
603 Like `os.makedirs()`. Will create intermediate subcollections needed
604 to contain the leaf subcollection path.
608 if self.find(path) != None:
609 raise IOError(errno.EEXIST, "Directory or file exists", path)
611 return self.find_or_create(path, COLLECTION)
613 def open(self, path, mode="r"):
614 """Open a file-like object for access.
617 path to a file in the collection
619 one of "r", "r+", "w", "w+", "a", "a+"
623 opens for reading and writing. Reads/writes share a file pointer.
625 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
627 opens for reading and writing. All writes are appended to
628 the end of the file. Writing does not affect the file pointer for
631 mode = mode.replace("b", "")
632 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
633 raise errors.ArgumentError("Bad mode '%s'" % mode)
634 create = (mode != "r")
636 if create and not self.writable():
637 raise IOError(errno.EROFS, "Collection is read only")
640 arvfile = self.find_or_create(path, FILE)
642 arvfile = self.find(path)
645 raise IOError(errno.ENOENT, "File not found", path)
646 if not isinstance(arvfile, ArvadosFile):
647 raise IOError(errno.EISDIR, "Is a directory", path)
652 name = os.path.basename(path)
655 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
657 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
660 """Determine if the collection has been modified since last commited."""
661 return not self.committed()
665 """Determine if the collection has been committed to the API server."""
666 return self._committed
669 def set_committed(self, value=True):
670 """Recursively set committed flag.
672 If value is True, set committed to be True for this and all children.
674 If value is False, set committed to be False for this and all parents.
676 if value == self._committed:
679 for k,v in list(self._items.items()):
680 v.set_committed(True)
681 self._committed = True
683 self._committed = False
684 if self.parent is not None:
685 self.parent.set_committed(False)
689 """Iterate over names of files and collections contained in this collection."""
690 return iter(list(self._items.keys()))
693 def __getitem__(self, k):
694 """Get a file or collection that is directly contained by this collection.
696 If you want to search a path, use `find()` instead.
699 return self._items[k]
702 def __contains__(self, k):
703 """Test if there is a file or collection a directly contained by this collection."""
704 return k in self._items
708 """Get the number of items directly contained in this collection."""
709 return len(self._items)
713 def __delitem__(self, p):
714 """Delete an item by name which is directly contained by this collection."""
716 self.set_committed(False)
717 self.notify(DEL, self, p, None)
721 """Get a list of names of files and collections directly contained in this collection."""
722 return list(self._items.keys())
726 """Get a list of files and collection objects directly contained in this collection."""
727 return list(self._items.values())
731 """Get a list of (name, object) tuples directly contained in this collection."""
732 return list(self._items.items())
734 def exists(self, path):
735 """Test if there is a file or collection at `path`."""
736 return self.find(path) is not None
740 def remove(self, path, recursive=False):
741 """Remove the file or subcollection (directory) at `path`.
744 Specify whether to remove non-empty subcollections (True), or raise an error (False).
748 raise errors.ArgumentError("Parameter 'path' is empty.")
750 pathcomponents = path.split("/", 1)
751 item = self._items.get(pathcomponents[0])
753 raise IOError(errno.ENOENT, "File not found", path)
754 if len(pathcomponents) == 1:
755 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
756 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
757 deleteditem = self._items[pathcomponents[0]]
758 del self._items[pathcomponents[0]]
759 self.set_committed(False)
760 self.notify(DEL, self, pathcomponents[0], deleteditem)
762 item.remove(pathcomponents[1])
764 def _clonefrom(self, source):
765 for k,v in list(source.items()):
766 self._items[k] = v.clone(self, k)
769 raise NotImplementedError()
773 def add(self, source_obj, target_name, overwrite=False, reparent=False):
774 """Copy or move a file or subcollection to this collection.
777 An ArvadosFile, or Subcollection object
780 Destination item name. If the target name already exists and is a
781 file, this will raise an error unless you specify `overwrite=True`.
784 Whether to overwrite target file if it already exists.
787 If True, source_obj will be moved from its parent collection to this collection.
788 If False, source_obj will be copied and the parent collection will be
793 if target_name in self and not overwrite:
794 raise IOError(errno.EEXIST, "File already exists", target_name)
797 if target_name in self:
798 modified_from = self[target_name]
800 # Actually make the move or copy.
802 source_obj._reparent(self, target_name)
805 item = source_obj.clone(self, target_name)
807 self._items[target_name] = item
808 self.set_committed(False)
811 self.notify(MOD, self, target_name, (modified_from, item))
813 self.notify(ADD, self, target_name, item)
815 def _get_src_target(self, source, target_path, source_collection, create_dest):
816 if source_collection is None:
817 source_collection = self
820 if isinstance(source, basestring):
821 source_obj = source_collection.find(source)
822 if source_obj is None:
823 raise IOError(errno.ENOENT, "File not found", source)
824 sourcecomponents = source.split("/")
827 sourcecomponents = None
829 # Find parent collection the target path
830 targetcomponents = target_path.split("/")
832 # Determine the name to use.
833 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
836 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
839 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
841 if len(targetcomponents) > 1:
842 target_dir = self.find("/".join(targetcomponents[0:-1]))
846 if target_dir is None:
847 raise IOError(errno.ENOENT, "Target directory not found", target_name)
849 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
850 target_dir = target_dir[target_name]
851 target_name = sourcecomponents[-1]
853 return (source_obj, target_dir, target_name)
857 def copy(self, source, target_path, source_collection=None, overwrite=False):
858 """Copy a file or subcollection to a new path in this collection.
861 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
864 Destination file or path. If the target path already exists and is a
865 subcollection, the item will be placed inside the subcollection. If
866 the target path already exists and is a file, this will raise an error
867 unless you specify `overwrite=True`.
870 Collection to copy `source_path` from (default `self`)
873 Whether to overwrite target file if it already exists.
876 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
877 target_dir.add(source_obj, target_name, overwrite, False)
881 def rename(self, source, target_path, source_collection=None, overwrite=False):
882 """Move a file or subcollection from `source_collection` to a new path in this collection.
885 A string with a path to source file or subcollection.
888 Destination file or path. If the target path already exists and is a
889 subcollection, the item will be placed inside the subcollection. If
890 the target path already exists and is a file, this will raise an error
891 unless you specify `overwrite=True`.
894 Collection to copy `source_path` from (default `self`)
897 Whether to overwrite target file if it already exists.
900 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
901 if not source_obj.writable():
902 raise IOError(errno.EROFS, "Source collection is read only", source)
903 target_dir.add(source_obj, target_name, overwrite, True)
905 def portable_manifest_text(self, stream_name="."):
906 """Get the manifest text for this collection, sub collections and files.
908 This method does not flush outstanding blocks to Keep. It will return
909 a normalized manifest with access tokens stripped.
912 Name to use for this stream (directory)
915 return self._get_manifest_text(stream_name, True, True)
918 def manifest_text(self, stream_name=".", strip=False, normalize=False,
919 only_committed=False):
920 """Get the manifest text for this collection, sub collections and files.
922 This method will flush outstanding blocks to Keep. By default, it will
923 not normalize an unmodified manifest or strip access tokens.
926 Name to use for this stream (directory)
929 If True, remove signing tokens from block locators if present.
930 If False (default), block locators are left unchanged.
933 If True, always export the manifest text in normalized form
934 even if the Collection is not modified. If False (default) and the collection
935 is not modified, return the original manifest text even if it is not
939 If True, don't commit pending blocks.
943 if not only_committed:
944 self._my_block_manager().commit_all()
945 return self._get_manifest_text(stream_name, strip, normalize,
946 only_committed=only_committed)
949 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
950 """Get the manifest text for this collection, sub collections and files.
953 Name to use for this stream (directory)
956 If True, remove signing tokens from block locators if present.
957 If False (default), block locators are left unchanged.
960 If True, always export the manifest text in normalized form
961 even if the Collection is not modified. If False (default) and the collection
962 is not modified, return the original manifest text even if it is not
966 If True, only include blocks that were already committed to Keep.
970 if not self.committed() or self._manifest_text is None or normalize:
973 sorted_keys = sorted(self.keys())
974 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
975 # Create a stream per file `k`
976 arvfile = self[filename]
978 for segment in arvfile.segments():
979 loc = segment.locator
980 if arvfile.parent._my_block_manager().is_bufferblock(loc):
983 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
985 loc = KeepLocator(loc).stripped()
986 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
987 segment.segment_offset, segment.range_size))
988 stream[filename] = filestream
990 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
991 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
992 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
996 return self.stripped_manifest()
998 return self._manifest_text
1001 def diff(self, end_collection, prefix=".", holding_collection=None):
1002 """Generate list of add/modify/delete actions.
1004 When given to `apply`, will change `self` to match `end_collection`
1008 if holding_collection is None:
1009 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1011 if k not in end_collection:
1012 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1013 for k in end_collection:
1015 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1016 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1017 elif end_collection[k] != self[k]:
1018 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1020 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1022 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1027 def apply(self, changes):
1028 """Apply changes from `diff`.
1030 If a change conflicts with a local change, it will be saved to an
1031 alternate path indicating the conflict.
1035 self.set_committed(False)
1036 for change in changes:
1037 event_type = change[0]
1040 local = self.find(path)
1041 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1043 if event_type == ADD:
1045 # No local file at path, safe to copy over new file
1046 self.copy(initial, path)
1047 elif local is not None and local != initial:
1048 # There is already local file and it is different:
1049 # save change to conflict file.
1050 self.copy(initial, conflictpath)
1051 elif event_type == MOD or event_type == TOK:
1053 if local == initial:
1054 # Local matches the "initial" item so it has not
1055 # changed locally and is safe to update.
1056 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1057 # Replace contents of local file with new contents
1058 local.replace_contents(final)
1060 # Overwrite path with new item; this can happen if
1061 # path was a file and is now a collection or vice versa
1062 self.copy(final, path, overwrite=True)
1064 # Local is missing (presumably deleted) or local doesn't
1065 # match the "start" value, so save change to conflict file
1066 self.copy(final, conflictpath)
1067 elif event_type == DEL:
1068 if local == initial:
1069 # Local item matches "initial" value, so it is safe to remove.
1070 self.remove(path, recursive=True)
1071 # else, the file is modified or already removed, in either
1072 # case we don't want to try to remove it.
1074 def portable_data_hash(self):
1075 """Get the portable data hash for this collection's manifest."""
1076 if self._manifest_locator and self.committed():
1077 # If the collection is already saved on the API server, and it's committed
1078 # then return API server's PDH response.
1079 return self._portable_data_hash
1081 stripped = self.portable_manifest_text()
1082 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1085 def subscribe(self, callback):
1086 if self._callback is None:
1087 self._callback = callback
1089 raise errors.ArgumentError("A callback is already set on this collection.")
1092 def unsubscribe(self):
1093 if self._callback is not None:
1094 self._callback = None
1097 def notify(self, event, collection, name, item):
1099 self._callback(event, collection, name, item)
1100 self.root_collection().notify(event, collection, name, item)
1103 def __eq__(self, other):
1106 if not isinstance(other, RichCollectionBase):
1108 if len(self._items) != len(other):
1110 for k in self._items:
1113 if self._items[k] != other[k]:
1117 def __ne__(self, other):
1118 return not self.__eq__(other)
1122 """Flush bufferblocks to Keep."""
1123 for e in list(self.values()):
1127 class Collection(RichCollectionBase):
1128 """Represents the root of an Arvados Collection.
1130 This class is threadsafe. The root collection object, all subcollections
1131 and files are protected by a single lock (i.e. each access locks the entire
1137 :To read an existing file:
1138 `c.open("myfile", "r")`
1140 :To write a new file:
1141 `c.open("myfile", "w")`
1143 :To determine if a file exists:
1144 `c.find("myfile") is not None`
1147 `c.copy("source", "dest")`
1150 `c.remove("myfile")`
1152 :To save to an existing collection record:
1155 :To save a new collection record:
1158 :To merge remote changes into this object:
1161 Must be associated with an API server Collection record (during
1162 initialization, or using `save_new`) to use `save` or `update`
1166 def __init__(self, manifest_locator_or_text=None,
1173 replication_desired=None,
1175 """Collection constructor.
1177 :manifest_locator_or_text:
1178 One of Arvados collection UUID, block locator of
1179 a manifest, raw manifest text, or None (to create an empty collection).
1181 the parent Collection, may be None.
1184 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1185 Prefer this over supplying your own api_client and keep_client (except in testing).
1186 Will use default config settings if not specified.
1189 The API client object to use for requests. If not specified, create one using `apiconfig`.
1192 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1195 the number of retries for API and Keep requests.
1198 the block manager to use. If not specified, create one.
1200 :replication_desired:
1201 How many copies should Arvados maintain. If None, API server default
1202 configuration applies. If not None, this value will also be used
1203 for determining the number of block copies being written.
1206 super(Collection, self).__init__(parent)
1207 self._api_client = api_client
1208 self._keep_client = keep_client
1209 self._block_manager = block_manager
1210 self.replication_desired = replication_desired
1211 self.put_threads = put_threads
1214 self._config = apiconfig
1216 self._config = config.settings()
1218 self.num_retries = num_retries if num_retries is not None else 0
1219 self._manifest_locator = None
1220 self._manifest_text = None
1221 self._portable_data_hash = None
1222 self._api_response = None
1223 self._past_versions = set()
1225 self.lock = threading.RLock()
1228 if manifest_locator_or_text:
1229 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1230 self._manifest_locator = manifest_locator_or_text
1231 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1232 self._manifest_locator = manifest_locator_or_text
1233 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1234 self._manifest_text = manifest_locator_or_text
1236 raise errors.ArgumentError(
1237 "Argument to CollectionReader is not a manifest or a collection UUID")
1241 except (IOError, errors.SyntaxError) as e:
1242 raise errors.ArgumentError("Error processing manifest text: %s", e)
1244 def root_collection(self):
1247 def stream_name(self):
1254 def known_past_version(self, modified_at_and_portable_data_hash):
1255 return modified_at_and_portable_data_hash in self._past_versions
1259 def update(self, other=None, num_retries=None):
1260 """Merge the latest collection on the API server with the current collection."""
1263 if self._manifest_locator is None:
1264 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1265 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1266 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1267 response.get("portable_data_hash") != self.portable_data_hash()):
1268 # The record on the server is different from our current one, but we've seen it before,
1269 # so ignore it because it's already been merged.
1270 # However, if it's the same as our current record, proceed with the update, because we want to update
1274 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1275 other = CollectionReader(response["manifest_text"])
1276 baseline = CollectionReader(self._manifest_text)
1277 self.apply(baseline.diff(other))
1278 self._manifest_text = self.manifest_text()
1282 if self._api_client is None:
1283 self._api_client = ThreadSafeApiCache(self._config)
1284 if self._keep_client is None:
1285 self._keep_client = self._api_client.keep
1286 return self._api_client
1290 if self._keep_client is None:
1291 if self._api_client is None:
1294 self._keep_client = KeepClient(api_client=self._api_client)
1295 return self._keep_client
1298 def _my_block_manager(self):
1299 if self._block_manager is None:
1300 copies = (self.replication_desired or
1301 self._my_api()._rootDesc.get('defaultCollectionReplication',
1303 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1304 return self._block_manager
1306 def _remember_api_response(self, response):
1307 self._api_response = response
1308 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1310 def _populate_from_api_server(self):
1311 # As in KeepClient itself, we must wait until the last
1312 # possible moment to instantiate an API client, in order to
1313 # avoid tripping up clients that don't have access to an API
1314 # server. If we do build one, make sure our Keep client uses
1315 # it. If instantiation fails, we'll fall back to the except
1316 # clause, just like any other Collection lookup
1317 # failure. Return an exception, or None if successful.
1319 self._remember_api_response(self._my_api().collections().get(
1320 uuid=self._manifest_locator).execute(
1321 num_retries=self.num_retries))
1322 self._manifest_text = self._api_response['manifest_text']
1323 self._portable_data_hash = self._api_response['portable_data_hash']
1324 # If not overriden via kwargs, we should try to load the
1325 # replication_desired from the API server
1326 if self.replication_desired is None:
1327 self.replication_desired = self._api_response.get('replication_desired', None)
1329 except Exception as e:
1332 def _populate_from_keep(self):
1333 # Retrieve a manifest directly from Keep. This has a chance of
1334 # working if [a] the locator includes a permission signature
1335 # or [b] the Keep services are operating in world-readable
1336 # mode. Return an exception, or None if successful.
1338 self._manifest_text = self._my_keep().get(
1339 self._manifest_locator, num_retries=self.num_retries)
1340 except Exception as e:
1343 def _populate(self):
1344 if self._manifest_locator is None and self._manifest_text is None:
1346 error_via_api = None
1347 error_via_keep = None
1348 should_try_keep = ((self._manifest_text is None) and
1349 arvados.util.keep_locator_pattern.match(
1350 self._manifest_locator))
1351 if ((self._manifest_text is None) and
1352 arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1353 error_via_keep = self._populate_from_keep()
1354 if self._manifest_text is None:
1355 error_via_api = self._populate_from_api_server()
1356 if error_via_api is not None and not should_try_keep:
1358 if ((self._manifest_text is None) and
1359 not error_via_keep and
1361 # Looks like a keep locator, and we didn't already try keep above
1362 error_via_keep = self._populate_from_keep()
1363 if self._manifest_text is None:
1365 raise errors.NotFoundError(
1366 ("Failed to retrieve collection '{}' " +
1367 "from either API server ({}) or Keep ({})."
1369 self._manifest_locator,
1373 self._baseline_manifest = self._manifest_text
1374 self._import_manifest(self._manifest_text)
1377 def _has_collection_uuid(self):
1378 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1380 def __enter__(self):
1383 def __exit__(self, exc_type, exc_value, traceback):
1384 """Support scoped auto-commit in a with: block."""
1385 if exc_type is None:
1386 if self.writable() and self._has_collection_uuid():
1390 def stop_threads(self):
1391 if self._block_manager is not None:
1392 self._block_manager.stop_threads()
1395 def manifest_locator(self):
1396 """Get the manifest locator, if any.
1398 The manifest locator will be set when the collection is loaded from an
1399 API server record or the portable data hash of a manifest.
1401 The manifest locator will be None if the collection is newly created or
1402 was created directly from manifest text. The method `save_new()` will
1403 assign a manifest locator.
1406 return self._manifest_locator
1409 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1410 if new_config is None:
1411 new_config = self._config
1413 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1415 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1417 newcollection._clonefrom(self)
1418 return newcollection
1421 def api_response(self):
1422 """Returns information about this Collection fetched from the API server.
1424 If the Collection exists in Keep but not the API server, currently
1425 returns None. Future versions may provide a synthetic response.
1428 return self._api_response
1430 def find_or_create(self, path, create_type):
1431 """See `RichCollectionBase.find_or_create`"""
1435 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1437 def find(self, path):
1438 """See `RichCollectionBase.find`"""
1442 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1444 def remove(self, path, recursive=False):
1445 """See `RichCollectionBase.remove`"""
1447 raise errors.ArgumentError("Cannot remove '.'")
1449 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1454 def save(self, merge=True, num_retries=None):
1455 """Save collection to an existing collection record.
1457 Commit pending buffer blocks to Keep, merge with remote record (if
1458 merge=True, the default), and update the collection record. Returns
1459 the current manifest text.
1461 Will raise AssertionError if not associated with a collection record on
1462 the API server. If you want to save a manifest to Keep only, see
1466 Update and merge remote changes before saving. Otherwise, any
1467 remote changes will be ignored and overwritten.
1470 Retry count on API calls (if None, use the collection default)
1473 if not self.committed():
1474 if not self._has_collection_uuid():
1475 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1477 self._my_block_manager().commit_all()
1482 text = self.manifest_text(strip=False)
1483 self._remember_api_response(self._my_api().collections().update(
1484 uuid=self._manifest_locator,
1485 body={'manifest_text': text}
1487 num_retries=num_retries))
1488 self._manifest_text = self._api_response["manifest_text"]
1489 self._portable_data_hash = self._api_response["portable_data_hash"]
1490 self.set_committed(True)
1492 return self._manifest_text
1498 def save_new(self, name=None,
1499 create_collection_record=True,
1501 ensure_unique_name=False,
1503 """Save collection to a new collection record.
1505 Commit pending buffer blocks to Keep and, when create_collection_record
1506 is True (default), create a new collection record. After creating a
1507 new collection record, this Collection object will be associated with
1508 the new record used by `save()`. Returns the current manifest text.
1511 The collection name.
1513 :create_collection_record:
1514 If True, create a collection record on the API server.
1515 If False, only commit blocks to Keep and return the manifest text.
1518 the user, or project uuid that will own this collection.
1519 If None, defaults to the current user.
1521 :ensure_unique_name:
1522 If True, ask the API server to rename the collection
1523 if it conflicts with a collection with the same name and owner. If
1524 False, a name conflict will result in an error.
1527 Retry count on API calls (if None, use the collection default)
1530 self._my_block_manager().commit_all()
1531 text = self.manifest_text(strip=False)
1533 if create_collection_record:
1535 name = "New collection"
1536 ensure_unique_name = True
1538 body = {"manifest_text": text,
1540 "replication_desired": self.replication_desired}
1542 body["owner_uuid"] = owner_uuid
1544 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1545 text = self._api_response["manifest_text"]
1547 self._manifest_locator = self._api_response["uuid"]
1548 self._portable_data_hash = self._api_response["portable_data_hash"]
1550 self._manifest_text = text
1551 self.set_committed(True)
1556 def _import_manifest(self, manifest_text):
1557 """Import a manifest into a `Collection`.
1560 The manifest text to import from.
1564 raise ArgumentError("Can only import manifest into an empty collection")
1573 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1574 tok = token_and_separator.group(1)
1575 sep = token_and_separator.group(2)
1577 if state == STREAM_NAME:
1578 # starting a new stream
1579 stream_name = tok.replace('\\040', ' ')
1584 self.find_or_create(stream_name, COLLECTION)
1588 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1590 blocksize = int(block_locator.group(1))
1591 blocks.append(Range(tok, streamoffset, blocksize, 0))
1592 streamoffset += blocksize
1596 if state == SEGMENTS:
1597 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1599 pos = int(file_segment.group(1))
1600 size = int(file_segment.group(2))
1601 name = file_segment.group(3).replace('\\040', ' ')
1602 filepath = os.path.join(stream_name, name)
1603 afile = self.find_or_create(filepath, FILE)
1604 if isinstance(afile, ArvadosFile):
1605 afile.add_segment(blocks, pos, size)
1607 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1610 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1616 self.set_committed(True)
1619 def notify(self, event, collection, name, item):
1621 self._callback(event, collection, name, item)
1624 class Subcollection(RichCollectionBase):
1625 """This is a subdirectory within a collection that doesn't have its own API
1628 Subcollection locking falls under the umbrella lock of its root collection.
1632 def __init__(self, parent, name):
1633 super(Subcollection, self).__init__(parent)
1634 self.lock = self.root_collection().lock
1635 self._manifest_text = None
1637 self.num_retries = parent.num_retries
1639 def root_collection(self):
1640 return self.parent.root_collection()
1643 return self.root_collection().writable()
1646 return self.root_collection()._my_api()
1649 return self.root_collection()._my_keep()
1651 def _my_block_manager(self):
1652 return self.root_collection()._my_block_manager()
1654 def stream_name(self):
1655 return os.path.join(self.parent.stream_name(), self.name)
1658 def clone(self, new_parent, new_name):
1659 c = Subcollection(new_parent, new_name)
1665 def _reparent(self, newparent, newname):
1666 self.set_committed(False)
1668 self.parent.remove(self.name, recursive=True)
1669 self.parent = newparent
1671 self.lock = self.parent.root_collection().lock
1674 class CollectionReader(Collection):
1675 """A read-only collection object.
1677 Initialize from an api collection record locator, a portable data hash of a
1678 manifest, or raw manifest text. See `Collection` constructor for detailed
1682 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1683 self._in_init = True
1684 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1685 self._in_init = False
1687 # Forego any locking since it should never change once initialized.
1688 self.lock = NoopLock()
1690 # Backwards compatability with old CollectionReader
1691 # all_streams() and all_files()
1692 self._streams = None
1695 return self._in_init
1697 def _populate_streams(orig_func):
1698 @functools.wraps(orig_func)
1699 def populate_streams_wrapper(self, *args, **kwargs):
1700 # Defer populating self._streams until needed since it creates a copy of the manifest.
1701 if self._streams is None:
1702 if self._manifest_text:
1703 self._streams = [sline.split()
1704 for sline in self._manifest_text.split("\n")
1708 return orig_func(self, *args, **kwargs)
1709 return populate_streams_wrapper
1712 def normalize(self):
1713 """Normalize the streams returned by `all_streams`.
1715 This method is kept for backwards compatability and only affects the
1716 behavior of `all_streams()` and `all_files()`
1722 for s in self.all_streams():
1723 for f in s.all_files():
1724 streamname, filename = split(s.name() + "/" + f.name())
1725 if streamname not in streams:
1726 streams[streamname] = {}
1727 if filename not in streams[streamname]:
1728 streams[streamname][filename] = []
1729 for r in f.segments:
1730 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1732 self._streams = [normalize_stream(s, streams[s])
1733 for s in sorted(streams)]
1735 def all_streams(self):
1736 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1737 for s in self._streams]
1740 def all_files(self):
1741 for s in self.all_streams():
1742 for f in s.all_files():