10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
479 COLLECTION = "collection"
481 class RichCollectionBase(CollectionBase):
482 """Base class for Collections and Subcollections.
484 Implements the majority of functionality relating to accessing items in the
489 def __init__(self, parent=None):
491 self._committed = False
492 self._callback = None
496 raise NotImplementedError()
499 raise NotImplementedError()
501 def _my_block_manager(self):
502 raise NotImplementedError()
505 raise NotImplementedError()
507 def root_collection(self):
508 raise NotImplementedError()
510 def notify(self, event, collection, name, item):
511 raise NotImplementedError()
513 def stream_name(self):
514 raise NotImplementedError()
518 def find_or_create(self, path, create_type):
519 """Recursively search the specified file path.
521 May return either a `Collection` or `ArvadosFile`. If not found, will
522 create a new item at the specified path based on `create_type`. Will
523 create intermediate subcollections needed to contain the final item in
527 One of `arvados.collection.FILE` or
528 `arvados.collection.COLLECTION`. If the path is not found, and value
529 of create_type is FILE then create and return a new ArvadosFile for
530 the last path component. If COLLECTION, then create and return a new
531 Collection for the last path component.
535 pathcomponents = path.split("/", 1)
536 if pathcomponents[0]:
537 item = self._items.get(pathcomponents[0])
538 if len(pathcomponents) == 1:
541 if create_type == COLLECTION:
542 item = Subcollection(self, pathcomponents[0])
544 item = ArvadosFile(self, pathcomponents[0])
545 self._items[pathcomponents[0]] = item
546 self._committed = False
547 self.notify(ADD, self, pathcomponents[0], item)
551 # create new collection
552 item = Subcollection(self, pathcomponents[0])
553 self._items[pathcomponents[0]] = item
554 self._committed = False
555 self.notify(ADD, self, pathcomponents[0], item)
556 if isinstance(item, RichCollectionBase):
557 return item.find_or_create(pathcomponents[1], create_type)
559 raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
564 def find(self, path):
565 """Recursively search the specified file path.
567 May return either a Collection or ArvadosFile. Return None if not
572 raise errors.ArgumentError("Parameter 'path' is empty.")
574 pathcomponents = path.split("/", 1)
575 item = self._items.get(pathcomponents[0])
576 if len(pathcomponents) == 1:
579 if isinstance(item, RichCollectionBase):
580 if pathcomponents[1]:
581 return item.find(pathcomponents[1])
585 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
588 def mkdirs(self, path):
589 """Recursive subcollection create.
591 Like `os.makedirs()`. Will create intermediate subcollections needed
592 to contain the leaf subcollection path.
596 if self.find(path) != None:
597 raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
599 return self.find_or_create(path, COLLECTION)
601 def open(self, path, mode="r"):
602 """Open a file-like object for access.
605 path to a file in the collection
607 one of "r", "r+", "w", "w+", "a", "a+"
611 opens for reading and writing. Reads/writes share a file pointer.
613 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
615 opens for reading and writing. All writes are appended to
616 the end of the file. Writing does not affect the file pointer for
619 mode = mode.replace("b", "")
620 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
621 raise errors.ArgumentError("Bad mode '%s'" % mode)
622 create = (mode != "r")
624 if create and not self.writable():
625 raise IOError(errno.EROFS, "Collection is read only")
628 arvfile = self.find_or_create(path, FILE)
630 arvfile = self.find(path)
633 raise IOError(errno.ENOENT, "File not found")
634 if not isinstance(arvfile, ArvadosFile):
635 raise IOError(errno.EISDIR, "Is a directory: %s" % path)
640 name = os.path.basename(path)
643 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
645 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
648 """Determine if the collection has been modified since last commited."""
649 return not self.committed()
653 """Determine if the collection has been committed to the API server."""
655 if self._committed is False:
657 for v in self._items.values():
658 if v.committed() is False:
663 def set_committed(self):
664 """Recursively set committed flag to True."""
665 self._committed = True
666 for k,v in self._items.items():
671 """Iterate over names of files and collections contained in this collection."""
672 return iter(self._items.keys())
675 def __getitem__(self, k):
676 """Get a file or collection that is directly contained by this collection.
678 If you want to search a path, use `find()` instead.
681 return self._items[k]
684 def __contains__(self, k):
685 """Test if there is a file or collection a directly contained by this collection."""
686 return k in self._items
690 """Get the number of items directly contained in this collection."""
691 return len(self._items)
695 def __delitem__(self, p):
696 """Delete an item by name which is directly contained by this collection."""
698 self._committed = False
699 self.notify(DEL, self, p, None)
703 """Get a list of names of files and collections directly contained in this collection."""
704 return self._items.keys()
708 """Get a list of files and collection objects directly contained in this collection."""
709 return self._items.values()
713 """Get a list of (name, object) tuples directly contained in this collection."""
714 return self._items.items()
716 def exists(self, path):
717 """Test if there is a file or collection at `path`."""
718 return self.find(path) is not None
722 def remove(self, path, recursive=False):
723 """Remove the file or subcollection (directory) at `path`.
726 Specify whether to remove non-empty subcollections (True), or raise an error (False).
730 raise errors.ArgumentError("Parameter 'path' is empty.")
732 pathcomponents = path.split("/", 1)
733 item = self._items.get(pathcomponents[0])
735 raise IOError(errno.ENOENT, "File not found")
736 if len(pathcomponents) == 1:
737 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
738 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
739 deleteditem = self._items[pathcomponents[0]]
740 del self._items[pathcomponents[0]]
741 self._committed = False
742 self.notify(DEL, self, pathcomponents[0], deleteditem)
744 item.remove(pathcomponents[1])
746 def _clonefrom(self, source):
747 for k,v in source.items():
748 self._items[k] = v.clone(self, k)
751 raise NotImplementedError()
755 def add(self, source_obj, target_name, overwrite=False, reparent=False):
756 """Copy or move a file or subcollection to this collection.
759 An ArvadosFile, or Subcollection object
762 Destination item name. If the target name already exists and is a
763 file, this will raise an error unless you specify `overwrite=True`.
766 Whether to overwrite target file if it already exists.
769 If True, source_obj will be moved from its parent collection to this collection.
770 If False, source_obj will be copied and the parent collection will be
775 if target_name in self and not overwrite:
776 raise IOError(errno.EEXIST, "File already exists")
779 if target_name in self:
780 modified_from = self[target_name]
782 # Actually make the move or copy.
784 source_obj._reparent(self, target_name)
787 item = source_obj.clone(self, target_name)
789 self._items[target_name] = item
790 self._committed = False
793 self.notify(MOD, self, target_name, (modified_from, item))
795 self.notify(ADD, self, target_name, item)
797 def _get_src_target(self, source, target_path, source_collection, create_dest):
798 if source_collection is None:
799 source_collection = self
802 if isinstance(source, basestring):
803 source_obj = source_collection.find(source)
804 if source_obj is None:
805 raise IOError(errno.ENOENT, "File not found")
806 sourcecomponents = source.split("/")
809 sourcecomponents = None
811 # Find parent collection the target path
812 targetcomponents = target_path.split("/")
814 # Determine the name to use.
815 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
818 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
821 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
823 if len(targetcomponents) > 1:
824 target_dir = self.find("/".join(targetcomponents[0:-1]))
828 if target_dir is None:
829 raise IOError(errno.ENOENT, "Target directory not found.")
831 if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
832 target_dir = target_dir[target_name]
833 target_name = sourcecomponents[-1]
835 return (source_obj, target_dir, target_name)
839 def copy(self, source, target_path, source_collection=None, overwrite=False):
840 """Copy a file or subcollection to a new path in this collection.
843 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
846 Destination file or path. If the target path already exists and is a
847 subcollection, the item will be placed inside the subcollection. If
848 the target path already exists and is a file, this will raise an error
849 unless you specify `overwrite=True`.
852 Collection to copy `source_path` from (default `self`)
855 Whether to overwrite target file if it already exists.
858 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
859 target_dir.add(source_obj, target_name, overwrite, False)
863 def rename(self, source, target_path, source_collection=None, overwrite=False):
864 """Move a file or subcollection from `source_collection` to a new path in this collection.
867 A string with a path to source file or subcollection.
870 Destination file or path. If the target path already exists and is a
871 subcollection, the item will be placed inside the subcollection. If
872 the target path already exists and is a file, this will raise an error
873 unless you specify `overwrite=True`.
876 Collection to copy `source_path` from (default `self`)
879 Whether to overwrite target file if it already exists.
882 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
883 if not source_obj.writable():
884 raise IOError(errno.EROFS, "Source collection is read only.")
885 target_dir.add(source_obj, target_name, overwrite, True)
887 def portable_manifest_text(self, stream_name="."):
888 """Get the manifest text for this collection, sub collections and files.
890 This method does not flush outstanding blocks to Keep. It will return
891 a normalized manifest with access tokens stripped.
894 Name to use for this stream (directory)
897 return self._get_manifest_text(stream_name, True, True)
900 def manifest_text(self, stream_name=".", strip=False, normalize=False):
901 """Get the manifest text for this collection, sub collections and files.
903 This method will flush outstanding blocks to Keep. By default, it will
904 not normalize an unmodified manifest or strip access tokens.
907 Name to use for this stream (directory)
910 If True, remove signing tokens from block locators if present.
911 If False (default), block locators are left unchanged.
914 If True, always export the manifest text in normalized form
915 even if the Collection is not modified. If False (default) and the collection
916 is not modified, return the original manifest text even if it is not
921 self._my_block_manager().commit_all()
922 return self._get_manifest_text(stream_name, strip, normalize)
925 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
926 """Get the manifest text for this collection, sub collections and files.
929 Name to use for this stream (directory)
932 If True, remove signing tokens from block locators if present.
933 If False (default), block locators are left unchanged.
936 If True, always export the manifest text in normalized form
937 even if the Collection is not modified. If False (default) and the collection
938 is not modified, return the original manifest text even if it is not
942 If True, only include blocks that were already committed to Keep.
946 if not self.committed() or self._manifest_text is None or normalize:
949 sorted_keys = sorted(self.keys())
950 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
951 # Create a stream per file `k`
952 arvfile = self[filename]
954 for segment in arvfile.segments():
955 loc = segment.locator
956 if arvfile.parent._my_block_manager().is_bufferblock(loc):
959 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
961 loc = KeepLocator(loc).stripped()
962 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
963 segment.segment_offset, segment.range_size))
964 stream[filename] = filestream
966 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
967 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
968 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
972 return self.stripped_manifest()
974 return self._manifest_text
977 def diff(self, end_collection, prefix=".", holding_collection=None):
978 """Generate list of add/modify/delete actions.
980 When given to `apply`, will change `self` to match `end_collection`
984 if holding_collection is None:
985 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
987 if k not in end_collection:
988 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
989 for k in end_collection:
991 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
992 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
993 elif end_collection[k] != self[k]:
994 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
996 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
998 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1003 def apply(self, changes):
1004 """Apply changes from `diff`.
1006 If a change conflicts with a local change, it will be saved to an
1007 alternate path indicating the conflict.
1011 self._committed = False
1012 for change in changes:
1013 event_type = change[0]
1016 local = self.find(path)
1017 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1019 if event_type == ADD:
1021 # No local file at path, safe to copy over new file
1022 self.copy(initial, path)
1023 elif local is not None and local != initial:
1024 # There is already local file and it is different:
1025 # save change to conflict file.
1026 self.copy(initial, conflictpath)
1027 elif event_type == MOD or event_type == TOK:
1029 if local == initial:
1030 # Local matches the "initial" item so it has not
1031 # changed locally and is safe to update.
1032 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1033 # Replace contents of local file with new contents
1034 local.replace_contents(final)
1036 # Overwrite path with new item; this can happen if
1037 # path was a file and is now a collection or vice versa
1038 self.copy(final, path, overwrite=True)
1040 # Local is missing (presumably deleted) or local doesn't
1041 # match the "start" value, so save change to conflict file
1042 self.copy(final, conflictpath)
1043 elif event_type == DEL:
1044 if local == initial:
1045 # Local item matches "initial" value, so it is safe to remove.
1046 self.remove(path, recursive=True)
1047 # else, the file is modified or already removed, in either
1048 # case we don't want to try to remove it.
1050 def portable_data_hash(self):
1051 """Get the portable data hash for this collection's manifest."""
1052 stripped = self.portable_manifest_text()
1053 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1056 def subscribe(self, callback):
1057 if self._callback is None:
1058 self._callback = callback
1060 raise errors.ArgumentError("A callback is already set on this collection.")
1063 def unsubscribe(self):
1064 if self._callback is not None:
1065 self._callback = None
1068 def notify(self, event, collection, name, item):
1070 self._callback(event, collection, name, item)
1071 self.root_collection().notify(event, collection, name, item)
1074 def __eq__(self, other):
1077 if not isinstance(other, RichCollectionBase):
1079 if len(self._items) != len(other):
1081 for k in self._items:
1084 if self._items[k] != other[k]:
1088 def __ne__(self, other):
1089 return not self.__eq__(other)
1093 """Flush bufferblocks to Keep."""
1094 for e in self.values():
1098 class Collection(RichCollectionBase):
1099 """Represents the root of an Arvados Collection.
1101 This class is threadsafe. The root collection object, all subcollections
1102 and files are protected by a single lock (i.e. each access locks the entire
1108 :To read an existing file:
1109 `c.open("myfile", "r")`
1111 :To write a new file:
1112 `c.open("myfile", "w")`
1114 :To determine if a file exists:
1115 `c.find("myfile") is not None`
1118 `c.copy("source", "dest")`
1121 `c.remove("myfile")`
1123 :To save to an existing collection record:
1126 :To save a new collection record:
1129 :To merge remote changes into this object:
1132 Must be associated with an API server Collection record (during
1133 initialization, or using `save_new`) to use `save` or `update`
1137 def __init__(self, manifest_locator_or_text=None,
1144 replication_desired=None):
1145 """Collection constructor.
1147 :manifest_locator_or_text:
1148 One of Arvados collection UUID, block locator of
1149 a manifest, raw manifest text, or None (to create an empty collection).
1151 the parent Collection, may be None.
1154 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1155 Prefer this over supplying your own api_client and keep_client (except in testing).
1156 Will use default config settings if not specified.
1159 The API client object to use for requests. If not specified, create one using `apiconfig`.
1162 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1165 the number of retries for API and Keep requests.
1168 the block manager to use. If not specified, create one.
1170 :replication_desired:
1171 How many copies should Arvados maintain. If None, API server default
1172 configuration applies. If not None, this value will also be used
1173 for determining the number of block copies being written.
1176 super(Collection, self).__init__(parent)
1177 self._api_client = api_client
1178 self._keep_client = keep_client
1179 self._block_manager = block_manager
1180 self.replication_desired = replication_desired
1183 self._config = apiconfig
1185 self._config = config.settings()
1187 self.num_retries = num_retries if num_retries is not None else 0
1188 self._manifest_locator = None
1189 self._manifest_text = None
1190 self._api_response = None
1191 self._past_versions = set()
1193 self.lock = threading.RLock()
1196 if manifest_locator_or_text:
1197 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1198 self._manifest_locator = manifest_locator_or_text
1199 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1200 self._manifest_locator = manifest_locator_or_text
1201 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1202 self._manifest_text = manifest_locator_or_text
1204 raise errors.ArgumentError(
1205 "Argument to CollectionReader is not a manifest or a collection UUID")
1209 except (IOError, errors.SyntaxError) as e:
1210 raise errors.ArgumentError("Error processing manifest text: %s", e)
1212 def root_collection(self):
1215 def stream_name(self):
1222 def known_past_version(self, modified_at_and_portable_data_hash):
1223 return modified_at_and_portable_data_hash in self._past_versions
1227 def update(self, other=None, num_retries=None):
1228 """Merge the latest collection on the API server with the current collection."""
1231 if self._manifest_locator is None:
1232 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1233 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1234 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1235 response.get("portable_data_hash") != self.portable_data_hash()):
1236 # The record on the server is different from our current one, but we've seen it before,
1237 # so ignore it because it's already been merged.
1238 # However, if it's the same as our current record, proceed with the update, because we want to update
1242 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1243 other = CollectionReader(response["manifest_text"])
1244 baseline = CollectionReader(self._manifest_text)
1245 self.apply(baseline.diff(other))
1246 self._manifest_text = self.manifest_text()
1250 if self._api_client is None:
1251 self._api_client = ThreadSafeApiCache(self._config)
1252 if self._keep_client is None:
1253 self._keep_client = self._api_client.keep
1254 return self._api_client
1258 if self._keep_client is None:
1259 if self._api_client is None:
1262 self._keep_client = KeepClient(api_client=self._api_client)
1263 return self._keep_client
1266 def _my_block_manager(self):
1267 if self._block_manager is None:
1268 copies = (self.replication_desired or
1269 self._my_api()._rootDesc.get('defaultCollectionReplication',
1271 self._block_manager = _BlockManager(self._my_keep(), copies=copies)
1272 return self._block_manager
1274 def _remember_api_response(self, response):
1275 self._api_response = response
1276 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1278 def _populate_from_api_server(self):
1279 # As in KeepClient itself, we must wait until the last
1280 # possible moment to instantiate an API client, in order to
1281 # avoid tripping up clients that don't have access to an API
1282 # server. If we do build one, make sure our Keep client uses
1283 # it. If instantiation fails, we'll fall back to the except
1284 # clause, just like any other Collection lookup
1285 # failure. Return an exception, or None if successful.
1287 self._remember_api_response(self._my_api().collections().get(
1288 uuid=self._manifest_locator).execute(
1289 num_retries=self.num_retries))
1290 self._manifest_text = self._api_response['manifest_text']
1291 # If not overriden via kwargs, we should try to load the
1292 # replication_desired from the API server
1293 if self.replication_desired is None:
1294 self.replication_desired = self._api_response.get('replication_desired', None)
1296 except Exception as e:
1299 def _populate_from_keep(self):
1300 # Retrieve a manifest directly from Keep. This has a chance of
1301 # working if [a] the locator includes a permission signature
1302 # or [b] the Keep services are operating in world-readable
1303 # mode. Return an exception, or None if successful.
1305 self._manifest_text = self._my_keep().get(
1306 self._manifest_locator, num_retries=self.num_retries)
1307 except Exception as e:
1310 def _populate(self):
1311 if self._manifest_locator is None and self._manifest_text is None:
1313 error_via_api = None
1314 error_via_keep = None
1315 should_try_keep = ((self._manifest_text is None) and
1316 util.keep_locator_pattern.match(
1317 self._manifest_locator))
1318 if ((self._manifest_text is None) and
1319 util.signed_locator_pattern.match(self._manifest_locator)):
1320 error_via_keep = self._populate_from_keep()
1321 if self._manifest_text is None:
1322 error_via_api = self._populate_from_api_server()
1323 if error_via_api is not None and not should_try_keep:
1325 if ((self._manifest_text is None) and
1326 not error_via_keep and
1328 # Looks like a keep locator, and we didn't already try keep above
1329 error_via_keep = self._populate_from_keep()
1330 if self._manifest_text is None:
1332 raise errors.NotFoundError(
1333 ("Failed to retrieve collection '{}' " +
1334 "from either API server ({}) or Keep ({})."
1336 self._manifest_locator,
1340 self._baseline_manifest = self._manifest_text
1341 self._import_manifest(self._manifest_text)
1344 def _has_collection_uuid(self):
1345 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1347 def __enter__(self):
1350 def __exit__(self, exc_type, exc_value, traceback):
1351 """Support scoped auto-commit in a with: block."""
1352 if exc_type is None:
1353 if self.writable() and self._has_collection_uuid():
1357 def stop_threads(self):
1358 if self._block_manager is not None:
1359 self._block_manager.stop_threads()
1362 def manifest_locator(self):
1363 """Get the manifest locator, if any.
1365 The manifest locator will be set when the collection is loaded from an
1366 API server record or the portable data hash of a manifest.
1368 The manifest locator will be None if the collection is newly created or
1369 was created directly from manifest text. The method `save_new()` will
1370 assign a manifest locator.
1373 return self._manifest_locator
1376 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1377 if new_config is None:
1378 new_config = self._config
1380 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1382 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1384 newcollection._clonefrom(self)
1385 return newcollection
1388 def api_response(self):
1389 """Returns information about this Collection fetched from the API server.
1391 If the Collection exists in Keep but not the API server, currently
1392 returns None. Future versions may provide a synthetic response.
1395 return self._api_response
1397 def find_or_create(self, path, create_type):
1398 """See `RichCollectionBase.find_or_create`"""
1402 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1404 def find(self, path):
1405 """See `RichCollectionBase.find`"""
1409 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1411 def remove(self, path, recursive=False):
1412 """See `RichCollectionBase.remove`"""
1414 raise errors.ArgumentError("Cannot remove '.'")
1416 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1421 def save(self, merge=True, num_retries=None):
1422 """Save collection to an existing collection record.
1424 Commit pending buffer blocks to Keep, merge with remote record (if
1425 merge=True, the default), and update the collection record. Returns
1426 the current manifest text.
1428 Will raise AssertionError if not associated with a collection record on
1429 the API server. If you want to save a manifest to Keep only, see
1433 Update and merge remote changes before saving. Otherwise, any
1434 remote changes will be ignored and overwritten.
1437 Retry count on API calls (if None, use the collection default)
1440 if not self.committed():
1441 if not self._has_collection_uuid():
1442 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1444 self._my_block_manager().commit_all()
1449 text = self.manifest_text(strip=False)
1450 self._remember_api_response(self._my_api().collections().update(
1451 uuid=self._manifest_locator,
1452 body={'manifest_text': text}
1454 num_retries=num_retries))
1455 self._manifest_text = self._api_response["manifest_text"]
1456 self.set_committed()
1458 return self._manifest_text
1464 def save_new(self, name=None,
1465 create_collection_record=True,
1467 ensure_unique_name=False,
1469 """Save collection to a new collection record.
1471 Commit pending buffer blocks to Keep and, when create_collection_record
1472 is True (default), create a new collection record. After creating a
1473 new collection record, this Collection object will be associated with
1474 the new record used by `save()`. Returns the current manifest text.
1477 The collection name.
1479 :create_collection_record:
1480 If True, create a collection record on the API server.
1481 If False, only commit blocks to Keep and return the manifest text.
1484 the user, or project uuid that will own this collection.
1485 If None, defaults to the current user.
1487 :ensure_unique_name:
1488 If True, ask the API server to rename the collection
1489 if it conflicts with a collection with the same name and owner. If
1490 False, a name conflict will result in an error.
1493 Retry count on API calls (if None, use the collection default)
1496 self._my_block_manager().commit_all()
1497 text = self.manifest_text(strip=False)
1499 if create_collection_record:
1501 name = "New collection"
1502 ensure_unique_name = True
1504 body = {"manifest_text": text,
1506 "replication_desired": self.replication_desired}
1508 body["owner_uuid"] = owner_uuid
1510 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1511 text = self._api_response["manifest_text"]
1513 self._manifest_locator = self._api_response["uuid"]
1515 self._manifest_text = text
1516 self.set_committed()
1521 def _import_manifest(self, manifest_text):
1522 """Import a manifest into a `Collection`.
1525 The manifest text to import from.
1529 raise ArgumentError("Can only import manifest into an empty collection")
1538 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1539 tok = token_and_separator.group(1)
1540 sep = token_and_separator.group(2)
1542 if state == STREAM_NAME:
1543 # starting a new stream
1544 stream_name = tok.replace('\\040', ' ')
1549 self.find_or_create(stream_name, COLLECTION)
1553 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1555 blocksize = long(block_locator.group(1))
1556 blocks.append(Range(tok, streamoffset, blocksize, 0))
1557 streamoffset += blocksize
1561 if state == SEGMENTS:
1562 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1564 pos = long(file_segment.group(1))
1565 size = long(file_segment.group(2))
1566 name = file_segment.group(3).replace('\\040', ' ')
1567 filepath = os.path.join(stream_name, name)
1568 afile = self.find_or_create(filepath, FILE)
1569 if isinstance(afile, ArvadosFile):
1570 afile.add_segment(blocks, pos, size)
1572 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1575 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1581 self.set_committed()
1584 def notify(self, event, collection, name, item):
1586 self._callback(event, collection, name, item)
1589 class Subcollection(RichCollectionBase):
1590 """This is a subdirectory within a collection that doesn't have its own API
1593 Subcollection locking falls under the umbrella lock of its root collection.
1597 def __init__(self, parent, name):
1598 super(Subcollection, self).__init__(parent)
1599 self.lock = self.root_collection().lock
1600 self._manifest_text = None
1602 self.num_retries = parent.num_retries
1604 def root_collection(self):
1605 return self.parent.root_collection()
1608 return self.root_collection().writable()
1611 return self.root_collection()._my_api()
1614 return self.root_collection()._my_keep()
1616 def _my_block_manager(self):
1617 return self.root_collection()._my_block_manager()
1619 def stream_name(self):
1620 return os.path.join(self.parent.stream_name(), self.name)
1623 def clone(self, new_parent, new_name):
1624 c = Subcollection(new_parent, new_name)
1630 def _reparent(self, newparent, newname):
1631 self._committed = False
1633 self.parent.remove(self.name, recursive=True)
1634 self.parent = newparent
1636 self.lock = self.parent.root_collection().lock
1639 class CollectionReader(Collection):
1640 """A read-only collection object.
1642 Initialize from an api collection record locator, a portable data hash of a
1643 manifest, or raw manifest text. See `Collection` constructor for detailed
1647 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1648 self._in_init = True
1649 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1650 self._in_init = False
1652 # Forego any locking since it should never change once initialized.
1653 self.lock = NoopLock()
1655 # Backwards compatability with old CollectionReader
1656 # all_streams() and all_files()
1657 self._streams = None
1660 return self._in_init
1662 def _populate_streams(orig_func):
1663 @functools.wraps(orig_func)
1664 def populate_streams_wrapper(self, *args, **kwargs):
1665 # Defer populating self._streams until needed since it creates a copy of the manifest.
1666 if self._streams is None:
1667 if self._manifest_text:
1668 self._streams = [sline.split()
1669 for sline in self._manifest_text.split("\n")
1673 return orig_func(self, *args, **kwargs)
1674 return populate_streams_wrapper
1677 def normalize(self):
1678 """Normalize the streams returned by `all_streams`.
1680 This method is kept for backwards compatability and only affects the
1681 behavior of `all_streams()` and `all_files()`
1687 for s in self.all_streams():
1688 for f in s.all_files():
1689 streamname, filename = split(s.name() + "/" + f.name())
1690 if streamname not in streams:
1691 streams[streamname] = {}
1692 if filename not in streams[streamname]:
1693 streams[streamname][filename] = []
1694 for r in f.segments:
1695 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1697 self._streams = [normalize_stream(s, streams[s])
1698 for s in sorted(streams)]
1700 def all_streams(self):
1701 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1702 for s in self._streams]
1705 def all_files(self):
1706 for s in self.all_streams():
1707 for f in s.all_files():