10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
478 COLLECTION = "collection"
480 class RichCollectionBase(CollectionBase):
481 """Base class for Collections and Subcollections.
483 Implements the majority of functionality relating to accessing items in the
488 def __init__(self, parent=None):
490 self._modified = True
494 raise NotImplementedError()
497 raise NotImplementedError()
499 def _my_block_manager(self):
500 raise NotImplementedError()
503 raise NotImplementedError()
505 def root_collection(self):
506 raise NotImplementedError()
508 def notify(self, event, collection, name, item):
509 raise NotImplementedError()
511 def stream_name(self):
512 raise NotImplementedError()
516 def find_or_create(self, path, create_type):
517 """Recursively search the specified file path.
519 May return either a `Collection` or `ArvadosFile`. If not found, will
520 create a new item at the specified path based on `create_type`. Will
521 create intermediate subcollections needed to contain the final item in
525 One of `arvados.collection.FILE` or
526 `arvados.collection.COLLECTION`. If the path is not found, and value
527 of create_type is FILE then create and return a new ArvadosFile for
528 the last path component. If COLLECTION, then create and return a new
529 Collection for the last path component.
533 pathcomponents = path.split("/", 1)
534 if pathcomponents[0]:
535 item = self._items.get(pathcomponents[0])
536 if len(pathcomponents) == 1:
539 if create_type == COLLECTION:
540 item = Subcollection(self)
542 item = ArvadosFile(self)
543 self._items[pathcomponents[0]] = item
544 self._modified = True
545 self.notify(ADD, self, pathcomponents[0], item)
549 # create new collection
550 item = Subcollection(self)
551 self._items[pathcomponents[0]] = item
552 self._modified = True
553 self.notify(ADD, self, pathcomponents[0], item)
554 if isinstance(item, RichCollectionBase):
555 return item.find_or_create(pathcomponents[1], create_type)
557 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
562 def find(self, path):
563 """Recursively search the specified file path.
565 May return either a Collection or ArvadosFile. Return None if not
570 raise errors.ArgumentError("Parameter 'path' must not be empty.")
572 pathcomponents = path.split("/", 1)
573 item = self._items.get(pathcomponents[0])
574 if len(pathcomponents) == 1:
577 if isinstance(item, RichCollectionBase):
578 if pathcomponents[1]:
579 return item.find(pathcomponents[1])
583 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
586 """Recursive subcollection create.
588 Like `os.mkdirs()`. Will create intermediate subcollections needed to
589 contain the leaf subcollection path.
592 return self.find_or_create(path, COLLECTION)
594 def open(self, path, mode="r"):
595 """Open a file-like object for access.
598 path to a file in the collection
600 one of "r", "r+", "w", "w+", "a", "a+"
604 opens for reading and writing. Reads/writes share a file pointer.
606 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
608 opens for reading and writing. All writes are appended to
609 the end of the file. Writing does not affect the file pointer for
612 mode = mode.replace("b", "")
613 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
614 raise errors.ArgumentError("Bad mode '%s'" % mode)
615 create = (mode != "r")
617 if create and not self.writable():
618 raise IOError((errno.EROFS, "Collection is read only"))
621 arvfile = self.find_or_create(path, FILE)
623 arvfile = self.find(path)
626 raise IOError((errno.ENOENT, "File not found"))
627 if not isinstance(arvfile, ArvadosFile):
628 raise IOError((errno.EISDIR, "Path must refer to a file."))
633 name = os.path.basename(path)
636 return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
638 return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
642 """Test if the collection (or any subcollection or file) has been modified."""
645 for k,v in self._items.items():
651 def set_unmodified(self):
652 """Recursively clear modified flag."""
653 self._modified = False
654 for k,v in self._items.items():
659 """Iterate over names of files and collections contained in this collection."""
660 return iter(self._items.keys())
663 def __getitem__(self, k):
664 """Get a file or collection that is directly contained by this collection.
666 If you want to search a path, use `find()` instead.
669 return self._items[k]
672 def __contains__(self, k):
673 """Test if there is a file or collection a directly contained by this collection."""
674 return k in self._items
678 """Get the number of items directly contained in this collection."""
679 return len(self._items)
683 def __delitem__(self, p):
684 """Delete an item by name which is directly contained by this collection."""
686 self._modified = True
687 self.notify(DEL, self, p, None)
691 """Get a list of names of files and collections directly contained in this collection."""
692 return self._items.keys()
696 """Get a list of files and collection objects directly contained in this collection."""
697 return self._items.values()
701 """Get a list of (name, object) tuples directly contained in this collection."""
702 return self._items.items()
704 def exists(self, path):
705 """Test if there is a file or collection at `path`."""
706 return self.find(path) is not None
710 def remove(self, path, recursive=False):
711 """Remove the file or subcollection (directory) at `path`.
714 Specify whether to remove non-empty subcollections (True), or raise an error (False).
718 raise errors.ArgumentError("Parameter 'path' must not be empty.")
720 pathcomponents = path.split("/", 1)
721 item = self._items.get(pathcomponents[0])
723 raise IOError((errno.ENOENT, "File not found"))
724 if len(pathcomponents) == 1:
725 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
726 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
727 deleteditem = self._items[pathcomponents[0]]
728 del self._items[pathcomponents[0]]
729 self._modified = True
730 self.notify(DEL, self, pathcomponents[0], deleteditem)
732 item.remove(pathcomponents[1])
734 def _clonefrom(self, source):
735 for k,v in source.items():
736 self._items[k] = v.clone(self)
739 raise NotImplementedError()
743 def add(self, source_obj, target_name, overwrite=False):
744 """Copy a file or subcollection to this collection.
747 An ArvadosFile, or Subcollection object
750 Destination item name. If the target name already exists and is a
751 file, this will raise an error unless you specify `overwrite=True`.
754 Whether to overwrite target file if it already exists.
758 if target_name in self and not overwrite:
759 raise IOError((errno.EEXIST, "File already exists"))
762 if target_name in self:
763 modified_from = self[target_name]
765 # Actually make the copy.
766 dup = source_obj.clone(self)
767 self._items[target_name] = dup
768 self._modified = True
771 self.notify(MOD, self, target_name, (modified_from, dup))
773 self.notify(ADD, self, target_name, dup)
777 def copy(self, source, target_path, source_collection=None, overwrite=False):
778 """Copy a file or subcollection to a new path in this collection.
781 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
784 Destination file or path. If the target path already exists and is a
785 subcollection, the item will be placed inside the subcollection. If
786 the target path already exists and is a file, this will raise an error
787 unless you specify `overwrite=True`.
790 Collection to copy `source_path` from (default `self`)
793 Whether to overwrite target file if it already exists.
795 if source_collection is None:
796 source_collection = self
798 # Find the object to copy
799 if isinstance(source, basestring):
800 source_obj = source_collection.find(source)
801 if source_obj is None:
802 raise IOError((errno.ENOENT, "File not found"))
803 sourcecomponents = source.split("/")
806 sourcecomponents = None
808 # Find parent collection the target path
809 targetcomponents = target_path.split("/")
811 # Determine the name to use.
812 target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
815 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
817 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
819 if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
820 target_dir = target_dir[target_name]
821 target_name = sourcecomponents[-1]
823 target_dir.add(source_obj, target_name, overwrite)
825 def portable_manifest_text(self, stream_name="."):
826 """Get the manifest text for this collection, sub collections and files.
828 This method does not flush outstanding blocks to Keep. It will return
829 a normalized manifest with access tokens stripped.
832 Name to use for this stream (directory)
835 return self._get_manifest_text(stream_name, True, True)
837 def manifest_text(self, stream_name=".", strip=False, normalize=False):
838 """Get the manifest text for this collection, sub collections and files.
840 This method will flush outstanding blocks to Keep. By default, it will
841 not normalize an unmodified manifest or strip access tokens.
844 Name to use for this stream (directory)
847 If True, remove signing tokens from block locators if present.
848 If False (default), block locators are left unchanged.
851 If True, always export the manifest text in normalized form
852 even if the Collection is not modified. If False (default) and the collection
853 is not modified, return the original manifest text even if it is not
858 self._my_block_manager().commit_all()
859 return self._get_manifest_text(stream_name, strip, normalize)
862 def _get_manifest_text(self, stream_name, strip, normalize):
863 """Get the manifest text for this collection, sub collections and files.
866 Name to use for this stream (directory)
869 If True, remove signing tokens from block locators if present.
870 If False (default), block locators are left unchanged.
873 If True, always export the manifest text in normalized form
874 even if the Collection is not modified. If False (default) and the collection
875 is not modified, return the original manifest text even if it is not
880 if self.modified() or self._manifest_text is None or normalize:
883 sorted_keys = sorted(self.keys())
884 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
885 # Create a stream per file `k`
886 arvfile = self[filename]
888 for segment in arvfile.segments():
889 loc = segment.locator
890 if arvfile.parent._my_block_manager().is_bufferblock(loc):
891 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
893 loc = KeepLocator(loc).stripped()
894 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
895 segment.segment_offset, segment.range_size))
896 stream[filename] = filestream
898 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
899 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
900 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
904 return self.stripped_manifest()
906 return self._manifest_text
909 def diff(self, end_collection, prefix=".", holding_collection=None):
910 """Generate list of add/modify/delete actions.
912 When given to `apply`, will change `self` to match `end_collection`
916 if holding_collection is None:
917 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
919 if k not in end_collection:
920 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
921 for k in end_collection:
923 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
924 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
925 elif end_collection[k] != self[k]:
926 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
928 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
933 def apply(self, changes):
934 """Apply changes from `diff`.
936 If a change conflicts with a local change, it will be saved to an
937 alternate path indicating the conflict.
940 for change in changes:
941 event_type = change[0]
944 local = self.find(path)
945 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
947 if event_type == ADD:
949 # No local file at path, safe to copy over new file
950 self.copy(initial, path)
951 elif local is not None and local != initial:
952 # There is already local file and it is different:
953 # save change to conflict file.
954 self.copy(initial, conflictpath)
955 elif event_type == MOD:
958 # Local matches the "initial" item so it has not
959 # changed locally and is safe to update.
960 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
961 # Replace contents of local file with new contents
962 local.replace_contents(final)
964 # Overwrite path with new item; this can happen if
965 # path was a file and is now a collection or vice versa
966 self.copy(final, path, overwrite=True)
968 # Local is missing (presumably deleted) or local doesn't
969 # match the "start" value, so save change to conflict file
970 self.copy(final, conflictpath)
971 elif event_type == DEL:
973 # Local item matches "initial" value, so it is safe to remove.
974 self.remove(path, recursive=True)
975 # else, the file is modified or already removed, in either
976 # case we don't want to try to remove it.
978 def portable_data_hash(self):
979 """Get the portable data hash for this collection's manifest."""
980 stripped = self.portable_manifest_text()
981 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
984 def __eq__(self, other):
987 if not isinstance(other, RichCollectionBase):
989 if len(self._items) != len(other):
991 for k in self._items:
994 if self._items[k] != other[k]:
998 def __ne__(self, other):
999 return not self.__eq__(other)
1002 class Collection(RichCollectionBase):
1003 """Represents the root of an Arvados Collection.
1005 This class is threadsafe. The root collection object, all subcollections
1006 and files are protected by a single lock (i.e. each access locks the entire
1012 :To read an existing file:
1013 `c.open("myfile", "r")`
1015 :To write a new file:
1016 `c.open("myfile", "w")`
1018 :To determine if a file exists:
1019 `c.find("myfile") is not None`
1022 `c.copy("source", "dest")`
1025 `c.remove("myfile")`
1027 :To save to an existing collection record:
1030 :To save a new collection record:
1033 :To merge remote changes into this object:
1036 Must be associated with an API server Collection record (during
1037 initialization, or using `save_new`) to use `save` or `update`
1041 def __init__(self, manifest_locator_or_text=None,
1047 block_manager=None):
1048 """Collection constructor.
1050 :manifest_locator_or_text:
1051 One of Arvados collection UUID, block locator of
1052 a manifest, raw manifest text, or None (to create an empty collection).
1054 the parent Collection, may be None.
1056 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1057 Prefer this over supplying your own api_client and keep_client (except in testing).
1058 Will use default config settings if not specified.
1060 The API client object to use for requests. If not specified, create one using `apiconfig`.
1062 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1064 the number of retries for API and Keep requests.
1066 the block manager to use. If not specified, create one.
1069 super(Collection, self).__init__(parent)
1070 self._api_client = api_client
1071 self._keep_client = keep_client
1072 self._block_manager = block_manager
1075 self._config = apiconfig
1077 self._config = config.settings()
1079 self.num_retries = num_retries if num_retries is not None else 0
1080 self._manifest_locator = None
1081 self._manifest_text = None
1082 self._api_response = None
1084 self.lock = threading.RLock()
1088 if manifest_locator_or_text:
1089 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1090 self._manifest_locator = manifest_locator_or_text
1091 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1092 self._manifest_locator = manifest_locator_or_text
1093 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1094 self._manifest_text = manifest_locator_or_text
1096 raise errors.ArgumentError(
1097 "Argument to CollectionReader must be a manifest or a collection UUID")
1101 except (IOError, errors.SyntaxError) as e:
1102 raise errors.ArgumentError("Error processing manifest text: %s", e)
1104 def root_collection(self):
1107 def stream_name(self):
1115 def update(self, other=None, num_retries=None):
1116 """Merge the latest collection on the API server with the current collection."""
1119 if self._manifest_locator is None:
1120 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1121 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1122 other = CollectionReader(response["manifest_text"])
1123 baseline = CollectionReader(self._manifest_text)
1124 self.apply(baseline.diff(other))
1128 if self._api_client is None:
1129 self._api_client = ThreadSafeApiCache(self._config)
1130 self._keep_client = self._api_client.keep
1131 return self._api_client
1135 if self._keep_client is None:
1136 if self._api_client is None:
1139 self._keep_client = KeepClient(api_client=self._api_client)
1140 return self._keep_client
1143 def _my_block_manager(self):
1144 if self._block_manager is None:
1145 self._block_manager = _BlockManager(self._my_keep())
1146 return self._block_manager
1148 def _populate_from_api_server(self):
1149 # As in KeepClient itself, we must wait until the last
1150 # possible moment to instantiate an API client, in order to
1151 # avoid tripping up clients that don't have access to an API
1152 # server. If we do build one, make sure our Keep client uses
1153 # it. If instantiation fails, we'll fall back to the except
1154 # clause, just like any other Collection lookup
1155 # failure. Return an exception, or None if successful.
1157 self._api_response = self._my_api().collections().get(
1158 uuid=self._manifest_locator).execute(
1159 num_retries=self.num_retries)
1160 self._manifest_text = self._api_response['manifest_text']
1162 except Exception as e:
1165 def _populate_from_keep(self):
1166 # Retrieve a manifest directly from Keep. This has a chance of
1167 # working if [a] the locator includes a permission signature
1168 # or [b] the Keep services are operating in world-readable
1169 # mode. Return an exception, or None if successful.
1171 self._manifest_text = self._my_keep().get(
1172 self._manifest_locator, num_retries=self.num_retries)
1173 except Exception as e:
1176 def _populate(self):
1177 if self._manifest_locator is None and self._manifest_text is None:
1179 error_via_api = None
1180 error_via_keep = None
1181 should_try_keep = ((self._manifest_text is None) and
1182 util.keep_locator_pattern.match(
1183 self._manifest_locator))
1184 if ((self._manifest_text is None) and
1185 util.signed_locator_pattern.match(self._manifest_locator)):
1186 error_via_keep = self._populate_from_keep()
1187 if self._manifest_text is None:
1188 error_via_api = self._populate_from_api_server()
1189 if error_via_api is not None and not should_try_keep:
1191 if ((self._manifest_text is None) and
1192 not error_via_keep and
1194 # Looks like a keep locator, and we didn't already try keep above
1195 error_via_keep = self._populate_from_keep()
1196 if self._manifest_text is None:
1198 raise errors.NotFoundError(
1199 ("Failed to retrieve collection '{}' " +
1200 "from either API server ({}) or Keep ({})."
1202 self._manifest_locator,
1206 self._baseline_manifest = self._manifest_text
1207 self._import_manifest(self._manifest_text)
1210 def _has_collection_uuid(self):
1211 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1213 def __enter__(self):
1216 def __exit__(self, exc_type, exc_value, traceback):
1217 """Support scoped auto-commit in a with: block."""
1218 if exc_type is not None:
1219 if self.writable() and self._has_collection_uuid():
1221 if self._block_manager is not None:
1222 self._block_manager.stop_threads()
1225 def manifest_locator(self):
1226 """Get the manifest locator, if any.
1228 The manifest locator will be set when the collection is loaded from an
1229 API server record or the portable data hash of a manifest.
1231 The manifest locator will be None if the collection is newly created or
1232 was created directly from manifest text. The method `save_new()` will
1233 assign a manifest locator.
1236 return self._manifest_locator
1239 def clone(self, new_parent=None, readonly=False, new_config=None):
1240 if new_config is None:
1241 new_config = self._config
1243 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1245 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1247 newcollection._clonefrom(self)
1248 return newcollection
1251 def api_response(self):
1252 """Returns information about this Collection fetched from the API server.
1254 If the Collection exists in Keep but not the API server, currently
1255 returns None. Future versions may provide a synthetic response.
1258 return self._api_response
1260 def find_or_create(self, path, create_type):
1261 """See `RichCollectionBase.find_or_create`"""
1265 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1267 def find(self, path):
1268 """See `RichCollectionBase.find`"""
1272 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1274 def remove(self, path, recursive=False):
1275 """See `RichCollectionBase.remove`"""
1277 raise errors.ArgumentError("Cannot remove '.'")
1279 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1284 def save(self, merge=True, num_retries=None):
1285 """Save collection to an existing collection record.
1287 Commit pending buffer blocks to Keep, merge with remote record (if
1288 merge=True, the default), and update the collection record. Returns
1289 the current manifest text.
1291 Will raise AssertionError if not associated with a collection record on
1292 the API server. If you want to save a manifest to Keep only, see
1296 Update and merge remote changes before saving. Otherwise, any
1297 remote changes will be ignored and overwritten.
1300 Retry count on API calls (if None, use the collection default)
1304 if not self._has_collection_uuid():
1305 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
1307 self._my_block_manager().commit_all()
1312 text = self.manifest_text(strip=False)
1313 self._api_response = self._my_api().collections().update(
1314 uuid=self._manifest_locator,
1315 body={'manifest_text': text}
1317 num_retries=num_retries)
1318 self._manifest_text = self._api_response["manifest_text"]
1319 self.set_unmodified()
1321 return self._manifest_text
1327 def save_new(self, name=None,
1328 create_collection_record=True,
1330 ensure_unique_name=False,
1332 """Save collection to a new collection record.
1334 Commit pending buffer blocks to Keep and, when create_collection_record
1335 is True (default), create a new collection record. After creating a
1336 new collection record, this Collection object will be associated with
1337 the new record used by `save()`. Returns the current manifest text.
1340 The collection name.
1342 :create_collection_record:
1343 If True, create a collection record on the API server.
1344 If False, only commit blocks to Keep and return the manifest text.
1347 the user, or project uuid that will own this collection.
1348 If None, defaults to the current user.
1350 :ensure_unique_name:
1351 If True, ask the API server to rename the collection
1352 if it conflicts with a collection with the same name and owner. If
1353 False, a name conflict will result in an error.
1356 Retry count on API calls (if None, use the collection default)
1359 self._my_block_manager().commit_all()
1360 text = self.manifest_text(strip=False)
1362 if create_collection_record:
1364 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1366 body = {"manifest_text": text,
1369 body["owner_uuid"] = owner_uuid
1371 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1372 text = self._api_response["manifest_text"]
1374 self._manifest_locator = self._api_response["uuid"]
1376 self._manifest_text = text
1377 self.set_unmodified()
1382 def subscribe(self, callback):
1383 self.callbacks.append(callback)
1386 def unsubscribe(self, callback):
1387 self.callbacks.remove(callback)
1390 def notify(self, event, collection, name, item):
1391 for c in self.callbacks:
1392 c(event, collection, name, item)
1395 def _import_manifest(self, manifest_text):
1396 """Import a manifest into a `Collection`.
1399 The manifest text to import from.
1403 raise ArgumentError("Can only import manifest into an empty collection")
1412 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1413 tok = token_and_separator.group(1)
1414 sep = token_and_separator.group(2)
1416 if state == STREAM_NAME:
1417 # starting a new stream
1418 stream_name = tok.replace('\\040', ' ')
1426 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1428 blocksize = long(block_locator.group(1))
1429 blocks.append(Range(tok, streamoffset, blocksize))
1430 streamoffset += blocksize
1434 if state == SEGMENTS:
1435 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1437 pos = long(file_segment.group(1))
1438 size = long(file_segment.group(2))
1439 name = file_segment.group(3).replace('\\040', ' ')
1440 filepath = os.path.join(stream_name, name)
1441 afile = self.find_or_create(filepath, FILE)
1442 if isinstance(afile, ArvadosFile):
1443 afile.add_segment(blocks, pos, size)
1445 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1448 raise errors.SyntaxError("Invalid manifest format")
1454 self.set_unmodified()
1457 class Subcollection(RichCollectionBase):
1458 """This is a subdirectory within a collection that doesn't have its own API
1461 It falls under the umbrella of the root collection.
1465 def __init__(self, parent):
1466 super(Subcollection, self).__init__(parent)
1467 self.lock = self.root_collection().lock
1468 self._manifest_text = None
1470 def root_collection(self):
1471 return self.parent.root_collection()
1474 return self.root_collection().writable()
1477 return self.root_collection()._my_api()
1480 return self.root_collection()._my_keep()
1482 def _my_block_manager(self):
1483 return self.root_collection()._my_block_manager()
1485 def notify(self, event, collection, name, item):
1486 return self.root_collection().notify(event, collection, name, item)
1488 def stream_name(self):
1489 for k, v in self.parent.items():
1491 return os.path.join(self.parent.stream_name(), k)
1495 def clone(self, new_parent):
1496 c = Subcollection(new_parent)
1501 class CollectionReader(Collection):
1502 """A read-only collection object.
1504 Initialize from an api collection record locator, a portable data hash of a
1505 manifest, or raw manifest text. See `Collection` constructor for detailed
1509 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1510 self._in_init = True
1511 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1512 self._in_init = False
1514 # Forego any locking since it should never change once initialized.
1515 self.lock = NoopLock()
1517 # Backwards compatability with old CollectionReader
1518 # all_streams() and all_files()
1519 self._streams = None
1522 return self._in_init
1524 def _populate_streams(orig_func):
1525 @functools.wraps(orig_func)
1526 def populate_streams_wrapper(self, *args, **kwargs):
1527 # Defer populating self._streams until needed since it creates a copy of the manifest.
1528 if self._streams is None:
1529 if self._manifest_text:
1530 self._streams = [sline.split()
1531 for sline in self._manifest_text.split("\n")
1535 return orig_func(self, *args, **kwargs)
1536 return populate_streams_wrapper
1539 def normalize(self):
1540 """Normalize the streams returned by `all_streams`.
1542 This method is kept for backwards compatability and only affects the
1543 behavior of `all_streams()` and `all_files()`
1549 for s in self.all_streams():
1550 for f in s.all_files():
1551 streamname, filename = split(s.name() + "/" + f.name())
1552 if streamname not in streams:
1553 streams[streamname] = {}
1554 if filename not in streams[streamname]:
1555 streams[streamname][filename] = []
1556 for r in f.segments:
1557 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1559 self._streams = [normalize_stream(s, streams[s])
1560 for s in sorted(streams)]
1562 def all_streams(self):
1563 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1564 for s in self._streams]
1567 def all_files(self):
1568 for s in self.all_streams():
1569 for f in s.all_files():