10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
479 COLLECTION = "collection"
481 class RichCollectionBase(CollectionBase):
482 """Base class for Collections and Subcollections.
484 Implements the majority of functionality relating to accessing items in the
489 def __init__(self, parent=None):
491 self._committed = False
492 self._callback = None
496 raise NotImplementedError()
499 raise NotImplementedError()
501 def _my_block_manager(self):
502 raise NotImplementedError()
505 raise NotImplementedError()
507 def root_collection(self):
508 raise NotImplementedError()
510 def notify(self, event, collection, name, item):
511 raise NotImplementedError()
513 def stream_name(self):
514 raise NotImplementedError()
518 def find_or_create(self, path, create_type):
519 """Recursively search the specified file path.
521 May return either a `Collection` or `ArvadosFile`. If not found, will
522 create a new item at the specified path based on `create_type`. Will
523 create intermediate subcollections needed to contain the final item in
527 One of `arvados.collection.FILE` or
528 `arvados.collection.COLLECTION`. If the path is not found, and value
529 of create_type is FILE then create and return a new ArvadosFile for
530 the last path component. If COLLECTION, then create and return a new
531 Collection for the last path component.
535 pathcomponents = path.split("/", 1)
536 if pathcomponents[0]:
537 item = self._items.get(pathcomponents[0])
538 if len(pathcomponents) == 1:
541 if create_type == COLLECTION:
542 item = Subcollection(self, pathcomponents[0])
544 item = ArvadosFile(self, pathcomponents[0])
545 self._items[pathcomponents[0]] = item
546 self._committed = False
547 self.notify(ADD, self, pathcomponents[0], item)
551 # create new collection
552 item = Subcollection(self, pathcomponents[0])
553 self._items[pathcomponents[0]] = item
554 self._committed = False
555 self.notify(ADD, self, pathcomponents[0], item)
556 if isinstance(item, RichCollectionBase):
557 return item.find_or_create(pathcomponents[1], create_type)
559 raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
564 def find(self, path):
565 """Recursively search the specified file path.
567 May return either a Collection or ArvadosFile. Return None if not
572 raise errors.ArgumentError("Parameter 'path' is empty.")
574 pathcomponents = path.split("/", 1)
575 item = self._items.get(pathcomponents[0])
576 if len(pathcomponents) == 1:
579 if isinstance(item, RichCollectionBase):
580 if pathcomponents[1]:
581 return item.find(pathcomponents[1])
585 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
588 def mkdirs(self, path):
589 """Recursive subcollection create.
591 Like `os.makedirs()`. Will create intermediate subcollections needed
592 to contain the leaf subcollection path.
596 if self.find(path) != None:
597 raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
599 return self.find_or_create(path, COLLECTION)
601 def open(self, path, mode="r"):
602 """Open a file-like object for access.
605 path to a file in the collection
607 one of "r", "r+", "w", "w+", "a", "a+"
611 opens for reading and writing. Reads/writes share a file pointer.
613 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
615 opens for reading and writing. All writes are appended to
616 the end of the file. Writing does not affect the file pointer for
619 mode = mode.replace("b", "")
620 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
621 raise errors.ArgumentError("Bad mode '%s'" % mode)
622 create = (mode != "r")
624 if create and not self.writable():
625 raise IOError(errno.EROFS, "Collection is read only")
628 arvfile = self.find_or_create(path, FILE)
630 arvfile = self.find(path)
633 raise IOError(errno.ENOENT, "File not found")
634 if not isinstance(arvfile, ArvadosFile):
635 raise IOError(errno.EISDIR, "Is a directory: %s" % path)
640 name = os.path.basename(path)
643 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
645 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
648 """Determine if the collection has been modified since last commited."""
649 return not self.committed()
653 """Determine if the collection has been committed to the API server."""
655 if self._committed is False:
657 for v in self._items.values():
658 if v.committed() is False:
663 def set_committed(self):
664 """Recursively set committed flag to True."""
665 self._committed = True
666 for k,v in self._items.items():
671 """Iterate over names of files and collections contained in this collection."""
672 return iter(self._items.keys())
675 def __getitem__(self, k):
676 """Get a file or collection that is directly contained by this collection.
678 If you want to search a path, use `find()` instead.
681 return self._items[k]
684 def __contains__(self, k):
685 """Test if there is a file or collection a directly contained by this collection."""
686 return k in self._items
690 """Get the number of items directly contained in this collection."""
691 return len(self._items)
695 def __delitem__(self, p):
696 """Delete an item by name which is directly contained by this collection."""
698 self._committed = False
699 self.notify(DEL, self, p, None)
703 """Get a list of names of files and collections directly contained in this collection."""
704 return self._items.keys()
708 """Get a list of files and collection objects directly contained in this collection."""
709 return self._items.values()
713 """Get a list of (name, object) tuples directly contained in this collection."""
714 return self._items.items()
716 def exists(self, path):
717 """Test if there is a file or collection at `path`."""
718 return self.find(path) is not None
722 def remove(self, path, recursive=False):
723 """Remove the file or subcollection (directory) at `path`.
726 Specify whether to remove non-empty subcollections (True), or raise an error (False).
730 raise errors.ArgumentError("Parameter 'path' is empty.")
732 pathcomponents = path.split("/", 1)
733 item = self._items.get(pathcomponents[0])
735 raise IOError(errno.ENOENT, "File not found")
736 if len(pathcomponents) == 1:
737 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
738 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
739 deleteditem = self._items[pathcomponents[0]]
740 del self._items[pathcomponents[0]]
741 self._committed = False
742 self.notify(DEL, self, pathcomponents[0], deleteditem)
744 item.remove(pathcomponents[1])
746 def _clonefrom(self, source):
747 for k,v in source.items():
748 self._items[k] = v.clone(self, k)
751 raise NotImplementedError()
755 def add(self, source_obj, target_name, overwrite=False, reparent=False):
756 """Copy or move a file or subcollection to this collection.
759 An ArvadosFile, or Subcollection object
762 Destination item name. If the target name already exists and is a
763 file, this will raise an error unless you specify `overwrite=True`.
766 Whether to overwrite target file if it already exists.
769 If True, source_obj will be moved from its parent collection to this collection.
770 If False, source_obj will be copied and the parent collection will be
775 if target_name in self and not overwrite:
776 raise IOError(errno.EEXIST, "File already exists")
779 if target_name in self:
780 modified_from = self[target_name]
782 # Actually make the move or copy.
784 source_obj._reparent(self, target_name)
787 item = source_obj.clone(self, target_name)
789 self._items[target_name] = item
790 self._committed = False
793 self.notify(MOD, self, target_name, (modified_from, item))
795 self.notify(ADD, self, target_name, item)
797 def _get_src_target(self, source, target_path, source_collection, create_dest):
798 if source_collection is None:
799 source_collection = self
802 if isinstance(source, basestring):
803 source_obj = source_collection.find(source)
804 if source_obj is None:
805 raise IOError(errno.ENOENT, "File not found")
806 sourcecomponents = source.split("/")
809 sourcecomponents = None
811 # Find parent collection the target path
812 targetcomponents = target_path.split("/")
814 # Determine the name to use.
815 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
818 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
821 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
823 if len(targetcomponents) > 1:
824 target_dir = self.find("/".join(targetcomponents[0:-1]))
828 if target_dir is None:
829 raise IOError(errno.ENOENT, "Target directory not found.")
831 if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
832 target_dir = target_dir[target_name]
833 target_name = sourcecomponents[-1]
835 return (source_obj, target_dir, target_name)
839 def copy(self, source, target_path, source_collection=None, overwrite=False):
840 """Copy a file or subcollection to a new path in this collection.
843 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
846 Destination file or path. If the target path already exists and is a
847 subcollection, the item will be placed inside the subcollection. If
848 the target path already exists and is a file, this will raise an error
849 unless you specify `overwrite=True`.
852 Collection to copy `source_path` from (default `self`)
855 Whether to overwrite target file if it already exists.
858 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
859 target_dir.add(source_obj, target_name, overwrite, False)
863 def rename(self, source, target_path, source_collection=None, overwrite=False):
864 """Move a file or subcollection from `source_collection` to a new path in this collection.
867 A string with a path to source file or subcollection.
870 Destination file or path. If the target path already exists and is a
871 subcollection, the item will be placed inside the subcollection. If
872 the target path already exists and is a file, this will raise an error
873 unless you specify `overwrite=True`.
876 Collection to copy `source_path` from (default `self`)
879 Whether to overwrite target file if it already exists.
882 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
883 if not source_obj.writable():
884 raise IOError(errno.EROFS, "Source collection is read only.")
885 target_dir.add(source_obj, target_name, overwrite, True)
887 def portable_manifest_text(self, stream_name="."):
888 """Get the manifest text for this collection, sub collections and files.
890 This method does not flush outstanding blocks to Keep. It will return
891 a normalized manifest with access tokens stripped.
894 Name to use for this stream (directory)
897 return self._get_manifest_text(stream_name, True, True)
900 def manifest_text(self, stream_name=".", strip=False, normalize=False):
901 """Get the manifest text for this collection, sub collections and files.
903 This method will flush outstanding blocks to Keep. By default, it will
904 not normalize an unmodified manifest or strip access tokens.
907 Name to use for this stream (directory)
910 If True, remove signing tokens from block locators if present.
911 If False (default), block locators are left unchanged.
914 If True, always export the manifest text in normalized form
915 even if the Collection is not modified. If False (default) and the collection
916 is not modified, return the original manifest text even if it is not
921 self._my_block_manager().commit_all()
922 return self._get_manifest_text(stream_name, strip, normalize)
925 def _get_manifest_text(self, stream_name, strip, normalize):
926 """Get the manifest text for this collection, sub collections and files.
929 Name to use for this stream (directory)
932 If True, remove signing tokens from block locators if present.
933 If False (default), block locators are left unchanged.
936 If True, always export the manifest text in normalized form
937 even if the Collection is not modified. If False (default) and the collection
938 is not modified, return the original manifest text even if it is not
943 if not self.committed() or self._manifest_text is None or normalize:
946 sorted_keys = sorted(self.keys())
947 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
948 # Create a stream per file `k`
949 arvfile = self[filename]
951 for segment in arvfile.segments():
952 loc = segment.locator
953 if arvfile.parent._my_block_manager().is_bufferblock(loc):
954 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
956 loc = KeepLocator(loc).stripped()
957 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
958 segment.segment_offset, segment.range_size))
959 stream[filename] = filestream
961 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
962 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
963 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
967 return self.stripped_manifest()
969 return self._manifest_text
972 def diff(self, end_collection, prefix=".", holding_collection=None):
973 """Generate list of add/modify/delete actions.
975 When given to `apply`, will change `self` to match `end_collection`
979 if holding_collection is None:
980 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
982 if k not in end_collection:
983 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
984 for k in end_collection:
986 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
987 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
988 elif end_collection[k] != self[k]:
989 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
991 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
993 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
998 def apply(self, changes):
999 """Apply changes from `diff`.
1001 If a change conflicts with a local change, it will be saved to an
1002 alternate path indicating the conflict.
1006 self._committed = False
1007 for change in changes:
1008 event_type = change[0]
1011 local = self.find(path)
1012 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1014 if event_type == ADD:
1016 # No local file at path, safe to copy over new file
1017 self.copy(initial, path)
1018 elif local is not None and local != initial:
1019 # There is already local file and it is different:
1020 # save change to conflict file.
1021 self.copy(initial, conflictpath)
1022 elif event_type == MOD or event_type == TOK:
1024 if local == initial:
1025 # Local matches the "initial" item so it has not
1026 # changed locally and is safe to update.
1027 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1028 # Replace contents of local file with new contents
1029 local.replace_contents(final)
1031 # Overwrite path with new item; this can happen if
1032 # path was a file and is now a collection or vice versa
1033 self.copy(final, path, overwrite=True)
1035 # Local is missing (presumably deleted) or local doesn't
1036 # match the "start" value, so save change to conflict file
1037 self.copy(final, conflictpath)
1038 elif event_type == DEL:
1039 if local == initial:
1040 # Local item matches "initial" value, so it is safe to remove.
1041 self.remove(path, recursive=True)
1042 # else, the file is modified or already removed, in either
1043 # case we don't want to try to remove it.
1045 def portable_data_hash(self):
1046 """Get the portable data hash for this collection's manifest."""
1047 stripped = self.portable_manifest_text()
1048 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1051 def subscribe(self, callback):
1052 if self._callback is None:
1053 self._callback = callback
1055 raise errors.ArgumentError("A callback is already set on this collection.")
1058 def unsubscribe(self):
1059 if self._callback is not None:
1060 self._callback = None
1063 def notify(self, event, collection, name, item):
1065 self._callback(event, collection, name, item)
1066 self.root_collection().notify(event, collection, name, item)
1069 def __eq__(self, other):
1072 if not isinstance(other, RichCollectionBase):
1074 if len(self._items) != len(other):
1076 for k in self._items:
1079 if self._items[k] != other[k]:
1083 def __ne__(self, other):
1084 return not self.__eq__(other)
1088 """Flush bufferblocks to Keep."""
1089 for e in self.values():
1093 class Collection(RichCollectionBase):
1094 """Represents the root of an Arvados Collection.
1096 This class is threadsafe. The root collection object, all subcollections
1097 and files are protected by a single lock (i.e. each access locks the entire
1103 :To read an existing file:
1104 `c.open("myfile", "r")`
1106 :To write a new file:
1107 `c.open("myfile", "w")`
1109 :To determine if a file exists:
1110 `c.find("myfile") is not None`
1113 `c.copy("source", "dest")`
1116 `c.remove("myfile")`
1118 :To save to an existing collection record:
1121 :To save a new collection record:
1124 :To merge remote changes into this object:
1127 Must be associated with an API server Collection record (during
1128 initialization, or using `save_new`) to use `save` or `update`
1132 def __init__(self, manifest_locator_or_text=None,
1138 block_manager=None):
1139 """Collection constructor.
1141 :manifest_locator_or_text:
1142 One of Arvados collection UUID, block locator of
1143 a manifest, raw manifest text, or None (to create an empty collection).
1145 the parent Collection, may be None.
1147 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1148 Prefer this over supplying your own api_client and keep_client (except in testing).
1149 Will use default config settings if not specified.
1151 The API client object to use for requests. If not specified, create one using `apiconfig`.
1153 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1155 the number of retries for API and Keep requests.
1157 the block manager to use. If not specified, create one.
1160 super(Collection, self).__init__(parent)
1161 self._api_client = api_client
1162 self._keep_client = keep_client
1163 self._block_manager = block_manager
1166 self._config = apiconfig
1168 self._config = config.settings()
1170 self.num_retries = num_retries if num_retries is not None else 0
1171 self._manifest_locator = None
1172 self._manifest_text = None
1173 self._api_response = None
1174 self._past_versions = set()
1176 self.lock = threading.RLock()
1179 if manifest_locator_or_text:
1180 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1181 self._manifest_locator = manifest_locator_or_text
1182 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1183 self._manifest_locator = manifest_locator_or_text
1184 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1185 self._manifest_text = manifest_locator_or_text
1187 raise errors.ArgumentError(
1188 "Argument to CollectionReader is not a manifest or a collection UUID")
1192 except (IOError, errors.SyntaxError) as e:
1193 raise errors.ArgumentError("Error processing manifest text: %s", e)
1195 def root_collection(self):
1198 def stream_name(self):
1205 def known_past_version(self, modified_at_and_portable_data_hash):
1206 return modified_at_and_portable_data_hash in self._past_versions
1210 def update(self, other=None, num_retries=None):
1211 """Merge the latest collection on the API server with the current collection."""
1214 if self._manifest_locator is None:
1215 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1216 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1217 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1218 response.get("portable_data_hash") != self.portable_data_hash()):
1219 # The record on the server is different from our current one, but we've seen it before,
1220 # so ignore it because it's already been merged.
1221 # However, if it's the same as our current record, proceed with the update, because we want to update
1225 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1226 other = CollectionReader(response["manifest_text"])
1227 baseline = CollectionReader(self._manifest_text)
1228 self.apply(baseline.diff(other))
1229 self._manifest_text = self.manifest_text()
1233 if self._api_client is None:
1234 self._api_client = ThreadSafeApiCache(self._config)
1235 self._keep_client = self._api_client.keep
1236 return self._api_client
1240 if self._keep_client is None:
1241 if self._api_client is None:
1244 self._keep_client = KeepClient(api_client=self._api_client)
1245 return self._keep_client
1248 def _my_block_manager(self):
1249 if self._block_manager is None:
1250 self._block_manager = _BlockManager(self._my_keep())
1251 return self._block_manager
1253 def _remember_api_response(self, response):
1254 self._api_response = response
1255 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1257 def _populate_from_api_server(self):
1258 # As in KeepClient itself, we must wait until the last
1259 # possible moment to instantiate an API client, in order to
1260 # avoid tripping up clients that don't have access to an API
1261 # server. If we do build one, make sure our Keep client uses
1262 # it. If instantiation fails, we'll fall back to the except
1263 # clause, just like any other Collection lookup
1264 # failure. Return an exception, or None if successful.
1266 self._remember_api_response(self._my_api().collections().get(
1267 uuid=self._manifest_locator).execute(
1268 num_retries=self.num_retries))
1269 self._manifest_text = self._api_response['manifest_text']
1271 except Exception as e:
1274 def _populate_from_keep(self):
1275 # Retrieve a manifest directly from Keep. This has a chance of
1276 # working if [a] the locator includes a permission signature
1277 # or [b] the Keep services are operating in world-readable
1278 # mode. Return an exception, or None if successful.
1280 self._manifest_text = self._my_keep().get(
1281 self._manifest_locator, num_retries=self.num_retries)
1282 except Exception as e:
1285 def _populate(self):
1286 if self._manifest_locator is None and self._manifest_text is None:
1288 error_via_api = None
1289 error_via_keep = None
1290 should_try_keep = ((self._manifest_text is None) and
1291 util.keep_locator_pattern.match(
1292 self._manifest_locator))
1293 if ((self._manifest_text is None) and
1294 util.signed_locator_pattern.match(self._manifest_locator)):
1295 error_via_keep = self._populate_from_keep()
1296 if self._manifest_text is None:
1297 error_via_api = self._populate_from_api_server()
1298 if error_via_api is not None and not should_try_keep:
1300 if ((self._manifest_text is None) and
1301 not error_via_keep and
1303 # Looks like a keep locator, and we didn't already try keep above
1304 error_via_keep = self._populate_from_keep()
1305 if self._manifest_text is None:
1307 raise errors.NotFoundError(
1308 ("Failed to retrieve collection '{}' " +
1309 "from either API server ({}) or Keep ({})."
1311 self._manifest_locator,
1315 self._baseline_manifest = self._manifest_text
1316 self._import_manifest(self._manifest_text)
1319 def _has_collection_uuid(self):
1320 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1322 def __enter__(self):
1325 def __exit__(self, exc_type, exc_value, traceback):
1326 """Support scoped auto-commit in a with: block."""
1327 if exc_type is None:
1328 if self.writable() and self._has_collection_uuid():
1332 def stop_threads(self):
1333 if self._block_manager is not None:
1334 self._block_manager.stop_threads()
1337 def manifest_locator(self):
1338 """Get the manifest locator, if any.
1340 The manifest locator will be set when the collection is loaded from an
1341 API server record or the portable data hash of a manifest.
1343 The manifest locator will be None if the collection is newly created or
1344 was created directly from manifest text. The method `save_new()` will
1345 assign a manifest locator.
1348 return self._manifest_locator
1351 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1352 if new_config is None:
1353 new_config = self._config
1355 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1357 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1359 newcollection._clonefrom(self)
1360 return newcollection
1363 def api_response(self):
1364 """Returns information about this Collection fetched from the API server.
1366 If the Collection exists in Keep but not the API server, currently
1367 returns None. Future versions may provide a synthetic response.
1370 return self._api_response
1372 def find_or_create(self, path, create_type):
1373 """See `RichCollectionBase.find_or_create`"""
1377 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1379 def find(self, path):
1380 """See `RichCollectionBase.find`"""
1384 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1386 def remove(self, path, recursive=False):
1387 """See `RichCollectionBase.remove`"""
1389 raise errors.ArgumentError("Cannot remove '.'")
1391 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1396 def save(self, merge=True, num_retries=None):
1397 """Save collection to an existing collection record.
1399 Commit pending buffer blocks to Keep, merge with remote record (if
1400 merge=True, the default), and update the collection record. Returns
1401 the current manifest text.
1403 Will raise AssertionError if not associated with a collection record on
1404 the API server. If you want to save a manifest to Keep only, see
1408 Update and merge remote changes before saving. Otherwise, any
1409 remote changes will be ignored and overwritten.
1412 Retry count on API calls (if None, use the collection default)
1415 if not self.committed():
1416 if not self._has_collection_uuid():
1417 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1419 self._my_block_manager().commit_all()
1424 text = self.manifest_text(strip=False)
1425 self._remember_api_response(self._my_api().collections().update(
1426 uuid=self._manifest_locator,
1427 body={'manifest_text': text}
1429 num_retries=num_retries))
1430 self._manifest_text = self._api_response["manifest_text"]
1431 self.set_committed()
1433 return self._manifest_text
1439 def save_new(self, name=None,
1440 create_collection_record=True,
1442 ensure_unique_name=False,
1444 """Save collection to a new collection record.
1446 Commit pending buffer blocks to Keep and, when create_collection_record
1447 is True (default), create a new collection record. After creating a
1448 new collection record, this Collection object will be associated with
1449 the new record used by `save()`. Returns the current manifest text.
1452 The collection name.
1454 :create_collection_record:
1455 If True, create a collection record on the API server.
1456 If False, only commit blocks to Keep and return the manifest text.
1459 the user, or project uuid that will own this collection.
1460 If None, defaults to the current user.
1462 :ensure_unique_name:
1463 If True, ask the API server to rename the collection
1464 if it conflicts with a collection with the same name and owner. If
1465 False, a name conflict will result in an error.
1468 Retry count on API calls (if None, use the collection default)
1471 self._my_block_manager().commit_all()
1472 text = self.manifest_text(strip=False)
1474 if create_collection_record:
1476 name = "New collection"
1477 ensure_unique_name = True
1479 body = {"manifest_text": text,
1482 body["owner_uuid"] = owner_uuid
1484 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1485 text = self._api_response["manifest_text"]
1487 self._manifest_locator = self._api_response["uuid"]
1489 self._manifest_text = text
1490 self.set_committed()
1495 def _import_manifest(self, manifest_text):
1496 """Import a manifest into a `Collection`.
1499 The manifest text to import from.
1503 raise ArgumentError("Can only import manifest into an empty collection")
1512 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1513 tok = token_and_separator.group(1)
1514 sep = token_and_separator.group(2)
1516 if state == STREAM_NAME:
1517 # starting a new stream
1518 stream_name = tok.replace('\\040', ' ')
1523 self.find_or_create(stream_name, COLLECTION)
1527 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1529 blocksize = long(block_locator.group(1))
1530 blocks.append(Range(tok, streamoffset, blocksize, 0))
1531 streamoffset += blocksize
1535 if state == SEGMENTS:
1536 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1538 pos = long(file_segment.group(1))
1539 size = long(file_segment.group(2))
1540 name = file_segment.group(3).replace('\\040', ' ')
1541 filepath = os.path.join(stream_name, name)
1542 afile = self.find_or_create(filepath, FILE)
1543 if isinstance(afile, ArvadosFile):
1544 afile.add_segment(blocks, pos, size)
1546 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1549 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1555 self.set_committed()
1558 def notify(self, event, collection, name, item):
1560 self._callback(event, collection, name, item)
1563 class Subcollection(RichCollectionBase):
1564 """This is a subdirectory within a collection that doesn't have its own API
1567 Subcollection locking falls under the umbrella lock of its root collection.
1571 def __init__(self, parent, name):
1572 super(Subcollection, self).__init__(parent)
1573 self.lock = self.root_collection().lock
1574 self._manifest_text = None
1576 self.num_retries = parent.num_retries
1578 def root_collection(self):
1579 return self.parent.root_collection()
1582 return self.root_collection().writable()
1585 return self.root_collection()._my_api()
1588 return self.root_collection()._my_keep()
1590 def _my_block_manager(self):
1591 return self.root_collection()._my_block_manager()
1593 def stream_name(self):
1594 return os.path.join(self.parent.stream_name(), self.name)
1597 def clone(self, new_parent, new_name):
1598 c = Subcollection(new_parent, new_name)
1604 def _reparent(self, newparent, newname):
1605 self._committed = False
1607 self.parent.remove(self.name, recursive=True)
1608 self.parent = newparent
1610 self.lock = self.parent.root_collection().lock
1613 class CollectionReader(Collection):
1614 """A read-only collection object.
1616 Initialize from an api collection record locator, a portable data hash of a
1617 manifest, or raw manifest text. See `Collection` constructor for detailed
1621 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1622 self._in_init = True
1623 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1624 self._in_init = False
1626 # Forego any locking since it should never change once initialized.
1627 self.lock = NoopLock()
1629 # Backwards compatability with old CollectionReader
1630 # all_streams() and all_files()
1631 self._streams = None
1634 return self._in_init
1636 def _populate_streams(orig_func):
1637 @functools.wraps(orig_func)
1638 def populate_streams_wrapper(self, *args, **kwargs):
1639 # Defer populating self._streams until needed since it creates a copy of the manifest.
1640 if self._streams is None:
1641 if self._manifest_text:
1642 self._streams = [sline.split()
1643 for sline in self._manifest_text.split("\n")
1647 return orig_func(self, *args, **kwargs)
1648 return populate_streams_wrapper
1651 def normalize(self):
1652 """Normalize the streams returned by `all_streams`.
1654 This method is kept for backwards compatability and only affects the
1655 behavior of `all_streams()` and `all_files()`
1661 for s in self.all_streams():
1662 for f in s.all_files():
1663 streamname, filename = split(s.name() + "/" + f.name())
1664 if streamname not in streams:
1665 streams[streamname] = {}
1666 if filename not in streams[streamname]:
1667 streams[streamname][filename] = []
1668 for r in f.segments:
1669 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1671 self._streams = [normalize_stream(s, streams[s])
1672 for s in sorted(streams)]
1674 def all_streams(self):
1675 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1676 for s in self._streams]
1679 def all_files(self):
1680 for s in self.all_streams():
1681 for f in s.all_files():