10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
478 COLLECTION = "collection"
480 class RichCollectionBase(CollectionBase):
481 """Base class for Collections and Subcollections.
483 Implements the majority of functionality relating to accessing items in the
488 def __init__(self, parent=None):
490 self._committed = False
491 self._callback = None
495 raise NotImplementedError()
498 raise NotImplementedError()
500 def _my_block_manager(self):
501 raise NotImplementedError()
504 raise NotImplementedError()
506 def root_collection(self):
507 raise NotImplementedError()
509 def notify(self, event, collection, name, item):
510 raise NotImplementedError()
512 def stream_name(self):
513 raise NotImplementedError()
517 def find_or_create(self, path, create_type):
518 """Recursively search the specified file path.
520 May return either a `Collection` or `ArvadosFile`. If not found, will
521 create a new item at the specified path based on `create_type`. Will
522 create intermediate subcollections needed to contain the final item in
526 One of `arvados.collection.FILE` or
527 `arvados.collection.COLLECTION`. If the path is not found, and value
528 of create_type is FILE then create and return a new ArvadosFile for
529 the last path component. If COLLECTION, then create and return a new
530 Collection for the last path component.
534 pathcomponents = path.split("/", 1)
535 if pathcomponents[0]:
536 item = self._items.get(pathcomponents[0])
537 if len(pathcomponents) == 1:
540 if create_type == COLLECTION:
541 item = Subcollection(self, pathcomponents[0])
543 item = ArvadosFile(self, pathcomponents[0])
544 self._items[pathcomponents[0]] = item
545 self._committed = False
546 self.notify(ADD, self, pathcomponents[0], item)
550 # create new collection
551 item = Subcollection(self, pathcomponents[0])
552 self._items[pathcomponents[0]] = item
553 self._committed = False
554 self.notify(ADD, self, pathcomponents[0], item)
555 if isinstance(item, RichCollectionBase):
556 return item.find_or_create(pathcomponents[1], create_type)
558 raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
563 def find(self, path):
564 """Recursively search the specified file path.
566 May return either a Collection or ArvadosFile. Return None if not
571 raise errors.ArgumentError("Parameter 'path' is empty.")
573 pathcomponents = path.split("/", 1)
574 item = self._items.get(pathcomponents[0])
575 if len(pathcomponents) == 1:
578 if isinstance(item, RichCollectionBase):
579 if pathcomponents[1]:
580 return item.find(pathcomponents[1])
584 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
587 def mkdirs(self, path):
588 """Recursive subcollection create.
590 Like `os.makedirs()`. Will create intermediate subcollections needed
591 to contain the leaf subcollection path.
595 if self.find(path) != None:
596 raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
598 return self.find_or_create(path, COLLECTION)
600 def open(self, path, mode="r"):
601 """Open a file-like object for access.
604 path to a file in the collection
606 one of "r", "r+", "w", "w+", "a", "a+"
610 opens for reading and writing. Reads/writes share a file pointer.
612 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
614 opens for reading and writing. All writes are appended to
615 the end of the file. Writing does not affect the file pointer for
618 mode = mode.replace("b", "")
619 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
620 raise errors.ArgumentError("Bad mode '%s'" % mode)
621 create = (mode != "r")
623 if create and not self.writable():
624 raise IOError(errno.EROFS, "Collection is read only")
627 arvfile = self.find_or_create(path, FILE)
629 arvfile = self.find(path)
632 raise IOError(errno.ENOENT, "File not found")
633 if not isinstance(arvfile, ArvadosFile):
634 raise IOError(errno.EISDIR, "Is a directory: %s" % path)
639 name = os.path.basename(path)
642 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
644 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
647 """Determine if the collection has been modified since last commited."""
648 return not self.committed()
652 """Determine if the collection has been committed to the API server."""
654 if self._committed is False:
656 for v in self._items.values():
657 if v.committed() is False:
662 def set_committed(self):
663 """Recursively set committed flag to True."""
664 self._committed = True
665 for k,v in self._items.items():
670 """Iterate over names of files and collections contained in this collection."""
671 return iter(self._items.keys())
674 def __getitem__(self, k):
675 """Get a file or collection that is directly contained by this collection.
677 If you want to search a path, use `find()` instead.
680 return self._items[k]
683 def __contains__(self, k):
684 """Test if there is a file or collection a directly contained by this collection."""
685 return k in self._items
689 """Get the number of items directly contained in this collection."""
690 return len(self._items)
694 def __delitem__(self, p):
695 """Delete an item by name which is directly contained by this collection."""
697 self._committed = False
698 self.notify(DEL, self, p, None)
702 """Get a list of names of files and collections directly contained in this collection."""
703 return self._items.keys()
707 """Get a list of files and collection objects directly contained in this collection."""
708 return self._items.values()
712 """Get a list of (name, object) tuples directly contained in this collection."""
713 return self._items.items()
715 def exists(self, path):
716 """Test if there is a file or collection at `path`."""
717 return self.find(path) is not None
721 def remove(self, path, recursive=False):
722 """Remove the file or subcollection (directory) at `path`.
725 Specify whether to remove non-empty subcollections (True), or raise an error (False).
729 raise errors.ArgumentError("Parameter 'path' is empty.")
731 pathcomponents = path.split("/", 1)
732 item = self._items.get(pathcomponents[0])
734 raise IOError(errno.ENOENT, "File not found")
735 if len(pathcomponents) == 1:
736 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
737 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
738 deleteditem = self._items[pathcomponents[0]]
739 del self._items[pathcomponents[0]]
740 self._committed = False
741 self.notify(DEL, self, pathcomponents[0], deleteditem)
743 item.remove(pathcomponents[1])
745 def _clonefrom(self, source):
746 for k,v in source.items():
747 self._items[k] = v.clone(self, k)
750 raise NotImplementedError()
754 def add(self, source_obj, target_name, overwrite=False, reparent=False):
755 """Copy or move a file or subcollection to this collection.
758 An ArvadosFile, or Subcollection object
761 Destination item name. If the target name already exists and is a
762 file, this will raise an error unless you specify `overwrite=True`.
765 Whether to overwrite target file if it already exists.
768 If True, source_obj will be moved from its parent collection to this collection.
769 If False, source_obj will be copied and the parent collection will be
774 if target_name in self and not overwrite:
775 raise IOError(errno.EEXIST, "File already exists")
778 if target_name in self:
779 modified_from = self[target_name]
781 # Actually make the move or copy.
783 source_obj._reparent(self, target_name)
786 item = source_obj.clone(self, target_name)
788 self._items[target_name] = item
789 self._committed = False
792 self.notify(MOD, self, target_name, (modified_from, item))
794 self.notify(ADD, self, target_name, item)
796 def _get_src_target(self, source, target_path, source_collection, create_dest):
797 if source_collection is None:
798 source_collection = self
801 if isinstance(source, basestring):
802 source_obj = source_collection.find(source)
803 if source_obj is None:
804 raise IOError(errno.ENOENT, "File not found")
805 sourcecomponents = source.split("/")
808 sourcecomponents = None
810 # Find parent collection the target path
811 targetcomponents = target_path.split("/")
813 # Determine the name to use.
814 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
817 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
820 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
822 if len(targetcomponents) > 1:
823 target_dir = self.find("/".join(targetcomponents[0:-1]))
827 if target_dir is None:
828 raise IOError(errno.ENOENT, "Target directory not found.")
830 if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
831 target_dir = target_dir[target_name]
832 target_name = sourcecomponents[-1]
834 return (source_obj, target_dir, target_name)
838 def copy(self, source, target_path, source_collection=None, overwrite=False):
839 """Copy a file or subcollection to a new path in this collection.
842 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
845 Destination file or path. If the target path already exists and is a
846 subcollection, the item will be placed inside the subcollection. If
847 the target path already exists and is a file, this will raise an error
848 unless you specify `overwrite=True`.
851 Collection to copy `source_path` from (default `self`)
854 Whether to overwrite target file if it already exists.
857 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
858 target_dir.add(source_obj, target_name, overwrite, False)
862 def rename(self, source, target_path, source_collection=None, overwrite=False):
863 """Move a file or subcollection from `source_collection` to a new path in this collection.
866 A string with a path to source file or subcollection.
869 Destination file or path. If the target path already exists and is a
870 subcollection, the item will be placed inside the subcollection. If
871 the target path already exists and is a file, this will raise an error
872 unless you specify `overwrite=True`.
875 Collection to copy `source_path` from (default `self`)
878 Whether to overwrite target file if it already exists.
881 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
882 if not source_obj.writable():
883 raise IOError(errno.EROFS, "Source collection is read only.")
884 target_dir.add(source_obj, target_name, overwrite, True)
886 def portable_manifest_text(self, stream_name="."):
887 """Get the manifest text for this collection, sub collections and files.
889 This method does not flush outstanding blocks to Keep. It will return
890 a normalized manifest with access tokens stripped.
893 Name to use for this stream (directory)
896 return self._get_manifest_text(stream_name, True, True)
899 def manifest_text(self, stream_name=".", strip=False, normalize=False):
900 """Get the manifest text for this collection, sub collections and files.
902 This method will flush outstanding blocks to Keep. By default, it will
903 not normalize an unmodified manifest or strip access tokens.
906 Name to use for this stream (directory)
909 If True, remove signing tokens from block locators if present.
910 If False (default), block locators are left unchanged.
913 If True, always export the manifest text in normalized form
914 even if the Collection is not modified. If False (default) and the collection
915 is not modified, return the original manifest text even if it is not
920 self._my_block_manager().commit_all()
921 return self._get_manifest_text(stream_name, strip, normalize)
924 def _get_manifest_text(self, stream_name, strip, normalize):
925 """Get the manifest text for this collection, sub collections and files.
928 Name to use for this stream (directory)
931 If True, remove signing tokens from block locators if present.
932 If False (default), block locators are left unchanged.
935 If True, always export the manifest text in normalized form
936 even if the Collection is not modified. If False (default) and the collection
937 is not modified, return the original manifest text even if it is not
942 if not self.committed() or self._manifest_text is None or normalize:
945 sorted_keys = sorted(self.keys())
946 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
947 # Create a stream per file `k`
948 arvfile = self[filename]
950 for segment in arvfile.segments():
951 loc = segment.locator
952 if arvfile.parent._my_block_manager().is_bufferblock(loc):
953 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
955 loc = KeepLocator(loc).stripped()
956 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
957 segment.segment_offset, segment.range_size))
958 stream[filename] = filestream
960 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
961 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
962 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
966 return self.stripped_manifest()
968 return self._manifest_text
971 def diff(self, end_collection, prefix=".", holding_collection=None):
972 """Generate list of add/modify/delete actions.
974 When given to `apply`, will change `self` to match `end_collection`
978 if holding_collection is None:
979 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
981 if k not in end_collection:
982 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
983 for k in end_collection:
985 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
986 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
987 elif end_collection[k] != self[k]:
988 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
990 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
995 def apply(self, changes):
996 """Apply changes from `diff`.
998 If a change conflicts with a local change, it will be saved to an
999 alternate path indicating the conflict.
1003 self._committed = False
1004 for change in changes:
1005 event_type = change[0]
1008 local = self.find(path)
1009 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1011 if event_type == ADD:
1013 # No local file at path, safe to copy over new file
1014 self.copy(initial, path)
1015 elif local is not None and local != initial:
1016 # There is already local file and it is different:
1017 # save change to conflict file.
1018 self.copy(initial, conflictpath)
1019 elif event_type == MOD:
1021 if local == initial:
1022 # Local matches the "initial" item so it has not
1023 # changed locally and is safe to update.
1024 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1025 # Replace contents of local file with new contents
1026 local.replace_contents(final)
1028 # Overwrite path with new item; this can happen if
1029 # path was a file and is now a collection or vice versa
1030 self.copy(final, path, overwrite=True)
1032 # Local is missing (presumably deleted) or local doesn't
1033 # match the "start" value, so save change to conflict file
1034 self.copy(final, conflictpath)
1035 elif event_type == DEL:
1036 if local == initial:
1037 # Local item matches "initial" value, so it is safe to remove.
1038 self.remove(path, recursive=True)
1039 # else, the file is modified or already removed, in either
1040 # case we don't want to try to remove it.
1042 def portable_data_hash(self):
1043 """Get the portable data hash for this collection's manifest."""
1044 stripped = self.portable_manifest_text()
1045 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1048 def subscribe(self, callback):
1049 if self._callback is None:
1050 self._callback = callback
1052 raise errors.ArgumentError("A callback is already set on this collection.")
1055 def unsubscribe(self):
1056 if self._callback is not None:
1057 self._callback = None
1060 def notify(self, event, collection, name, item):
1062 self._callback(event, collection, name, item)
1063 self.root_collection().notify(event, collection, name, item)
1066 def __eq__(self, other):
1069 if not isinstance(other, RichCollectionBase):
1071 if len(self._items) != len(other):
1073 for k in self._items:
1076 if self._items[k] != other[k]:
1080 def __ne__(self, other):
1081 return not self.__eq__(other)
1085 """Flush bufferblocks to Keep."""
1086 for e in self.values():
1090 class Collection(RichCollectionBase):
1091 """Represents the root of an Arvados Collection.
1093 This class is threadsafe. The root collection object, all subcollections
1094 and files are protected by a single lock (i.e. each access locks the entire
1100 :To read an existing file:
1101 `c.open("myfile", "r")`
1103 :To write a new file:
1104 `c.open("myfile", "w")`
1106 :To determine if a file exists:
1107 `c.find("myfile") is not None`
1110 `c.copy("source", "dest")`
1113 `c.remove("myfile")`
1115 :To save to an existing collection record:
1118 :To save a new collection record:
1121 :To merge remote changes into this object:
1124 Must be associated with an API server Collection record (during
1125 initialization, or using `save_new`) to use `save` or `update`
1129 def __init__(self, manifest_locator_or_text=None,
1135 block_manager=None):
1136 """Collection constructor.
1138 :manifest_locator_or_text:
1139 One of Arvados collection UUID, block locator of
1140 a manifest, raw manifest text, or None (to create an empty collection).
1142 the parent Collection, may be None.
1144 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1145 Prefer this over supplying your own api_client and keep_client (except in testing).
1146 Will use default config settings if not specified.
1148 The API client object to use for requests. If not specified, create one using `apiconfig`.
1150 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1152 the number of retries for API and Keep requests.
1154 the block manager to use. If not specified, create one.
1157 super(Collection, self).__init__(parent)
1158 self._api_client = api_client
1159 self._keep_client = keep_client
1160 self._block_manager = block_manager
1163 self._config = apiconfig
1165 self._config = config.settings()
1167 self.num_retries = num_retries if num_retries is not None else 0
1168 self._manifest_locator = None
1169 self._manifest_text = None
1170 self._api_response = None
1171 self._past_versions = set()
1173 self.lock = threading.RLock()
1176 if manifest_locator_or_text:
1177 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1178 self._manifest_locator = manifest_locator_or_text
1179 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1180 self._manifest_locator = manifest_locator_or_text
1181 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1182 self._manifest_text = manifest_locator_or_text
1184 raise errors.ArgumentError(
1185 "Argument to CollectionReader is not a manifest or a collection UUID")
1189 except (IOError, errors.SyntaxError) as e:
1190 raise errors.ArgumentError("Error processing manifest text: %s", e)
1192 def root_collection(self):
1195 def stream_name(self):
1202 def known_past_version(self, modified_at_and_portable_data_hash):
1203 return modified_at_and_portable_data_hash in self._past_versions
1207 def update(self, other=None, num_retries=None):
1208 """Merge the latest collection on the API server with the current collection."""
1211 if self._manifest_locator is None:
1212 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1213 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1214 if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
1215 # We've merged this record this before. Don't do anything.
1218 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1219 other = CollectionReader(response["manifest_text"])
1220 baseline = CollectionReader(self._manifest_text)
1221 self.apply(baseline.diff(other))
1222 self._manifest_text = self.manifest_text()
1226 if self._api_client is None:
1227 self._api_client = ThreadSafeApiCache(self._config)
1228 self._keep_client = self._api_client.keep
1229 return self._api_client
1233 if self._keep_client is None:
1234 if self._api_client is None:
1237 self._keep_client = KeepClient(api_client=self._api_client)
1238 return self._keep_client
1241 def _my_block_manager(self):
1242 if self._block_manager is None:
1243 self._block_manager = _BlockManager(self._my_keep())
1244 return self._block_manager
1246 def _remember_api_response(self, response):
1247 self._api_response = response
1248 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1250 def _populate_from_api_server(self):
1251 # As in KeepClient itself, we must wait until the last
1252 # possible moment to instantiate an API client, in order to
1253 # avoid tripping up clients that don't have access to an API
1254 # server. If we do build one, make sure our Keep client uses
1255 # it. If instantiation fails, we'll fall back to the except
1256 # clause, just like any other Collection lookup
1257 # failure. Return an exception, or None if successful.
1259 self._remember_api_response(self._my_api().collections().get(
1260 uuid=self._manifest_locator).execute(
1261 num_retries=self.num_retries))
1262 self._manifest_text = self._api_response['manifest_text']
1264 except Exception as e:
1267 def _populate_from_keep(self):
1268 # Retrieve a manifest directly from Keep. This has a chance of
1269 # working if [a] the locator includes a permission signature
1270 # or [b] the Keep services are operating in world-readable
1271 # mode. Return an exception, or None if successful.
1273 self._manifest_text = self._my_keep().get(
1274 self._manifest_locator, num_retries=self.num_retries)
1275 except Exception as e:
1278 def _populate(self):
1279 if self._manifest_locator is None and self._manifest_text is None:
1281 error_via_api = None
1282 error_via_keep = None
1283 should_try_keep = ((self._manifest_text is None) and
1284 util.keep_locator_pattern.match(
1285 self._manifest_locator))
1286 if ((self._manifest_text is None) and
1287 util.signed_locator_pattern.match(self._manifest_locator)):
1288 error_via_keep = self._populate_from_keep()
1289 if self._manifest_text is None:
1290 error_via_api = self._populate_from_api_server()
1291 if error_via_api is not None and not should_try_keep:
1293 if ((self._manifest_text is None) and
1294 not error_via_keep and
1296 # Looks like a keep locator, and we didn't already try keep above
1297 error_via_keep = self._populate_from_keep()
1298 if self._manifest_text is None:
1300 raise errors.NotFoundError(
1301 ("Failed to retrieve collection '{}' " +
1302 "from either API server ({}) or Keep ({})."
1304 self._manifest_locator,
1308 self._baseline_manifest = self._manifest_text
1309 self._import_manifest(self._manifest_text)
1312 def _has_collection_uuid(self):
1313 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1315 def __enter__(self):
1318 def __exit__(self, exc_type, exc_value, traceback):
1319 """Support scoped auto-commit in a with: block."""
1320 if exc_type is None:
1321 if self.writable() and self._has_collection_uuid():
1325 def stop_threads(self):
1326 if self._block_manager is not None:
1327 self._block_manager.stop_threads()
1330 def manifest_locator(self):
1331 """Get the manifest locator, if any.
1333 The manifest locator will be set when the collection is loaded from an
1334 API server record or the portable data hash of a manifest.
1336 The manifest locator will be None if the collection is newly created or
1337 was created directly from manifest text. The method `save_new()` will
1338 assign a manifest locator.
1341 return self._manifest_locator
1344 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1345 if new_config is None:
1346 new_config = self._config
1348 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1350 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1352 newcollection._clonefrom(self)
1353 return newcollection
1356 def api_response(self):
1357 """Returns information about this Collection fetched from the API server.
1359 If the Collection exists in Keep but not the API server, currently
1360 returns None. Future versions may provide a synthetic response.
1363 return self._api_response
1365 def find_or_create(self, path, create_type):
1366 """See `RichCollectionBase.find_or_create`"""
1370 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1372 def find(self, path):
1373 """See `RichCollectionBase.find`"""
1377 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1379 def remove(self, path, recursive=False):
1380 """See `RichCollectionBase.remove`"""
1382 raise errors.ArgumentError("Cannot remove '.'")
1384 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1389 def save(self, merge=True, num_retries=None):
1390 """Save collection to an existing collection record.
1392 Commit pending buffer blocks to Keep, merge with remote record (if
1393 merge=True, the default), and update the collection record. Returns
1394 the current manifest text.
1396 Will raise AssertionError if not associated with a collection record on
1397 the API server. If you want to save a manifest to Keep only, see
1401 Update and merge remote changes before saving. Otherwise, any
1402 remote changes will be ignored and overwritten.
1405 Retry count on API calls (if None, use the collection default)
1408 if not self.committed():
1409 if not self._has_collection_uuid():
1410 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1412 self._my_block_manager().commit_all()
1417 text = self.manifest_text(strip=False)
1418 self._remember_api_response(self._my_api().collections().update(
1419 uuid=self._manifest_locator,
1420 body={'manifest_text': text}
1422 num_retries=num_retries))
1423 self._manifest_text = self._api_response["manifest_text"]
1424 self.set_committed()
1426 return self._manifest_text
1432 def save_new(self, name=None,
1433 create_collection_record=True,
1435 ensure_unique_name=False,
1437 """Save collection to a new collection record.
1439 Commit pending buffer blocks to Keep and, when create_collection_record
1440 is True (default), create a new collection record. After creating a
1441 new collection record, this Collection object will be associated with
1442 the new record used by `save()`. Returns the current manifest text.
1445 The collection name.
1447 :create_collection_record:
1448 If True, create a collection record on the API server.
1449 If False, only commit blocks to Keep and return the manifest text.
1452 the user, or project uuid that will own this collection.
1453 If None, defaults to the current user.
1455 :ensure_unique_name:
1456 If True, ask the API server to rename the collection
1457 if it conflicts with a collection with the same name and owner. If
1458 False, a name conflict will result in an error.
1461 Retry count on API calls (if None, use the collection default)
1464 self._my_block_manager().commit_all()
1465 text = self.manifest_text(strip=False)
1467 if create_collection_record:
1469 name = "New collection"
1470 ensure_unique_name = True
1472 body = {"manifest_text": text,
1475 body["owner_uuid"] = owner_uuid
1477 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1478 text = self._api_response["manifest_text"]
1480 self._manifest_locator = self._api_response["uuid"]
1482 self._manifest_text = text
1483 self.set_committed()
1488 def _import_manifest(self, manifest_text):
1489 """Import a manifest into a `Collection`.
1492 The manifest text to import from.
1496 raise ArgumentError("Can only import manifest into an empty collection")
1505 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1506 tok = token_and_separator.group(1)
1507 sep = token_and_separator.group(2)
1509 if state == STREAM_NAME:
1510 # starting a new stream
1511 stream_name = tok.replace('\\040', ' ')
1516 self.find_or_create(stream_name, COLLECTION)
1520 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1522 blocksize = long(block_locator.group(1))
1523 blocks.append(Range(tok, streamoffset, blocksize, 0))
1524 streamoffset += blocksize
1528 if state == SEGMENTS:
1529 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1531 pos = long(file_segment.group(1))
1532 size = long(file_segment.group(2))
1533 name = file_segment.group(3).replace('\\040', ' ')
1534 filepath = os.path.join(stream_name, name)
1535 afile = self.find_or_create(filepath, FILE)
1536 if isinstance(afile, ArvadosFile):
1537 afile.add_segment(blocks, pos, size)
1539 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1542 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1548 self.set_committed()
1551 def notify(self, event, collection, name, item):
1553 self._callback(event, collection, name, item)
1556 class Subcollection(RichCollectionBase):
1557 """This is a subdirectory within a collection that doesn't have its own API
1560 Subcollection locking falls under the umbrella lock of its root collection.
1564 def __init__(self, parent, name):
1565 super(Subcollection, self).__init__(parent)
1566 self.lock = self.root_collection().lock
1567 self._manifest_text = None
1569 self.num_retries = parent.num_retries
1571 def root_collection(self):
1572 return self.parent.root_collection()
1575 return self.root_collection().writable()
1578 return self.root_collection()._my_api()
1581 return self.root_collection()._my_keep()
1583 def _my_block_manager(self):
1584 return self.root_collection()._my_block_manager()
1586 def stream_name(self):
1587 return os.path.join(self.parent.stream_name(), self.name)
1590 def clone(self, new_parent, new_name):
1591 c = Subcollection(new_parent, new_name)
1597 def _reparent(self, newparent, newname):
1598 self._committed = False
1600 self.parent.remove(self.name, recursive=True)
1601 self.parent = newparent
1603 self.lock = self.parent.root_collection().lock
1606 class CollectionReader(Collection):
1607 """A read-only collection object.
1609 Initialize from an api collection record locator, a portable data hash of a
1610 manifest, or raw manifest text. See `Collection` constructor for detailed
1614 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1615 self._in_init = True
1616 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1617 self._in_init = False
1619 # Forego any locking since it should never change once initialized.
1620 self.lock = NoopLock()
1622 # Backwards compatability with old CollectionReader
1623 # all_streams() and all_files()
1624 self._streams = None
1627 return self._in_init
1629 def _populate_streams(orig_func):
1630 @functools.wraps(orig_func)
1631 def populate_streams_wrapper(self, *args, **kwargs):
1632 # Defer populating self._streams until needed since it creates a copy of the manifest.
1633 if self._streams is None:
1634 if self._manifest_text:
1635 self._streams = [sline.split()
1636 for sline in self._manifest_text.split("\n")
1640 return orig_func(self, *args, **kwargs)
1641 return populate_streams_wrapper
1644 def normalize(self):
1645 """Normalize the streams returned by `all_streams`.
1647 This method is kept for backwards compatability and only affects the
1648 behavior of `all_streams()` and `all_files()`
1654 for s in self.all_streams():
1655 for f in s.all_files():
1656 streamname, filename = split(s.name() + "/" + f.name())
1657 if streamname not in streams:
1658 streams[streamname] = {}
1659 if filename not in streams[streamname]:
1660 streams[streamname][filename] = []
1661 for r in f.segments:
1662 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1664 self._streams = [normalize_stream(s, streams[s])
1665 for s in sorted(streams)]
1667 def all_streams(self):
1668 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1669 for s in self._streams]
1672 def all_files(self):
1673 for s in self.all_streams():
1674 for f in s.all_files():