10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
477 COLLECTION = "collection"
479 class SynchronizedCollectionBase(CollectionBase):
480 """Base class for Collections and Subcollections.
482 Implements the majority of functionality relating to accessing items in the
487 def __init__(self, parent=None):
489 self._modified = True
493 raise NotImplementedError()
496 raise NotImplementedError()
498 def _my_block_manager(self):
499 raise NotImplementedError()
502 raise NotImplementedError()
504 def root_collection(self):
505 raise NotImplementedError()
507 def notify(self, event, collection, name, item):
508 raise NotImplementedError()
510 def stream_name(self):
511 raise NotImplementedError()
515 def find_or_create(self, path, create_type):
516 """Recursively search the specified file path.
518 May return either a `Collection` or `ArvadosFile`. If not found, will
519 create a new item at the specified path based on `create_type`. Will
520 create intermediate subcollections needed to contain the final item in
524 One of `arvados.collection.FILE` or
525 `arvados.collection.COLLECTION`. If the path is not found, and value
526 of create_type is FILE then create and return a new ArvadosFile for
527 the last path component. If COLLECTION, then create and return a new
528 Collection for the last path component.
532 pathcomponents = path.split("/", 1)
533 if pathcomponents[0]:
534 item = self._items.get(pathcomponents[0])
535 if len(pathcomponents) == 1:
538 if create_type == COLLECTION:
539 item = Subcollection(self)
541 item = ArvadosFile(self)
542 self._items[pathcomponents[0]] = item
543 self._modified = True
544 self.notify(ADD, self, pathcomponents[0], item)
548 # create new collection
549 item = Subcollection(self)
550 self._items[pathcomponents[0]] = item
551 self._modified = True
552 self.notify(ADD, self, pathcomponents[0], item)
553 if isinstance(item, SynchronizedCollectionBase):
554 return item.find_or_create(pathcomponents[1], create_type)
556 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
561 def find(self, path):
562 """Recursively search the specified file path.
564 May return either a Collection or ArvadosFile. Return None if not
569 raise errors.ArgumentError("Parameter 'path' must not be empty.")
571 pathcomponents = path.split("/", 1)
572 item = self._items.get(pathcomponents[0])
573 if len(pathcomponents) == 1:
576 if isinstance(item, SynchronizedCollectionBase):
577 if pathcomponents[1]:
578 return item.find(pathcomponents[1])
582 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
585 """Recursive subcollection create.
587 Like `os.mkdirs()`. Will create intermediate subcollections needed to
588 contain the leaf subcollection path.
591 return self.find_or_create(path, COLLECTION)
593 def open(self, path, mode="r"):
594 """Open a file-like object for access.
597 path to a file in the collection
599 one of "r", "r+", "w", "w+", "a", "a+"
603 opens for reading and writing. Reads/writes share a file pointer.
605 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
607 opens for reading and writing. All writes are appended to
608 the end of the file. Writing does not affect the file pointer for
611 mode = mode.replace("b", "")
612 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
613 raise errors.ArgumentError("Bad mode '%s'" % mode)
614 create = (mode != "r")
616 if create and not self.writable():
617 raise IOError((errno.EROFS, "Collection is read only"))
620 arvfile = self.find_or_create(path, FILE)
622 arvfile = self.find(path)
625 raise IOError((errno.ENOENT, "File not found"))
626 if not isinstance(arvfile, ArvadosFile):
627 raise IOError((errno.EISDIR, "Path must refer to a file."))
632 name = os.path.basename(path)
635 return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
637 return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
641 """Test if the collection (or any subcollection or file) has been modified."""
644 for k,v in self._items.items():
650 def set_unmodified(self):
651 """Recursively clear modified flag."""
652 self._modified = False
653 for k,v in self._items.items():
658 """Iterate over names of files and collections contained in this collection."""
659 return iter(self._items.keys())
662 def __getitem__(self, k):
663 """Get a file or collection that is directly contained by this collection.
665 If you want to search a path, use `find()` instead.
668 return self._items[k]
671 def __contains__(self, k):
672 """Test if there is a file or collection a directly contained by this collection."""
673 return k in self._items
677 """Get the number of items directly contained in this collection."""
678 return len(self._items)
682 def __delitem__(self, p):
683 """Delete an item by name which is directly contained by this collection."""
685 self._modified = True
686 self.notify(DEL, self, p, None)
690 """Get a list of names of files and collections directly contained in this collection."""
691 return self._items.keys()
695 """Get a list of files and collection objects directly contained in this collection."""
696 return self._items.values()
700 """Get a list of (name, object) tuples directly contained in this collection."""
701 return self._items.items()
703 def exists(self, path):
704 """Test if there is a file or collection at `path`."""
705 return self.find(path) != None
709 def remove(self, path, recursive=False):
710 """Remove the file or subcollection (directory) at `path`.
713 Specify whether to remove non-empty subcollections (True), or raise an error (False).
717 raise errors.ArgumentError("Parameter 'path' must not be empty.")
719 pathcomponents = path.split("/", 1)
720 item = self._items.get(pathcomponents[0])
722 raise IOError((errno.ENOENT, "File not found"))
723 if len(pathcomponents) == 1:
724 if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
725 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
726 deleteditem = self._items[pathcomponents[0]]
727 del self._items[pathcomponents[0]]
728 self._modified = True
729 self.notify(DEL, self, pathcomponents[0], deleteditem)
731 item.remove(pathcomponents[1])
733 def _clonefrom(self, source):
734 for k,v in source.items():
735 self._items[k] = v.clone(self)
738 raise NotImplementedError()
742 def copy(self, source, target_path, source_collection=None, overwrite=False):
743 """Copy a file or subcollection to a new path in this collection.
746 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
749 Destination file or path. If the target path already exists and is a
750 subcollection, the item will be placed inside the subcollection. If
751 the target path already exists and is a file, this will raise an error
752 unless you specify `overwrite=True`.
755 Collection to copy `source_path` from (default `self`)
758 Whether to overwrite target file if it already exists.
760 if source_collection is None:
761 source_collection = self
763 # Find the object to copy
764 if isinstance(source, basestring):
765 source_obj = source_collection.find(source)
766 if source_obj is None:
767 raise IOError((errno.ENOENT, "File not found"))
768 sourcecomponents = source.split("/")
771 sourcecomponents = None
773 # Find parent collection the target path
774 targetcomponents = target_path.split("/")
776 # Determine the name to use.
777 target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
780 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
782 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
784 if target_name in target_dir:
785 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
786 target_dir = target_dir[target_name]
787 target_name = sourcecomponents[-1]
789 raise IOError((errno.EEXIST, "File already exists"))
792 if target_name in target_dir:
793 modified_from = target_dir[target_name]
795 # Actually make the copy.
796 dup = source_obj.clone(target_dir)
797 target_dir._items[target_name] = dup
798 target_dir._modified = True
801 self.notify(MOD, target_dir, target_name, (modified_from, dup))
803 self.notify(ADD, target_dir, target_name, dup)
806 def manifest_text(self, stream_name=".", strip=False, normalize=False):
807 """Get the manifest text for this collection, sub collections and files.
810 Name of the stream (directory)
813 If True, remove signing tokens from block locators if present.
814 If False (default), block locators are left unchanged.
817 If True, always export the manifest text in normalized form
818 even if the Collection is not modified. If False (default) and the collection
819 is not modified, return the original manifest text even if it is not
824 if self.modified() or self._manifest_text is None or normalize:
828 sorted_keys = sorted(item.keys())
829 for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
830 # Create a stream per file `k`
831 arvfile = item[filename]
833 for segment in arvfile.segments():
834 loc = segment.locator
835 if arvfile.parent._my_block_manager().is_bufferblock(loc):
836 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
838 loc = KeepLocator(loc).stripped()
839 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
840 segment.segment_offset, segment.range_size))
841 stream[filename] = filestream
843 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
844 for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
845 buf.append(item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
849 return self.stripped_manifest()
851 return self._manifest_text
854 def diff(self, end_collection, prefix=".", holding_collection=None):
855 """Generate list of add/modify/delete actions.
857 When given to `apply`, will change `self` to match `end_collection`
861 if holding_collection is None:
862 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
864 if k not in end_collection:
865 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
866 for k in end_collection:
868 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
869 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
870 elif end_collection[k] != self[k]:
871 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
873 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
878 def apply(self, changes):
879 """Apply changes from `diff`.
881 If a change conflicts with a local change, it will be saved to an
882 alternate path indicating the conflict.
885 for change in changes:
886 event_type = change[0]
889 local = self.find(path)
890 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
892 if event_type == ADD:
894 # No local file at path, safe to copy over new file
895 self.copy(initial, path)
896 elif local is not None and local != initial:
897 # There is already local file and it is different:
898 # save change to conflict file.
899 self.copy(initial, conflictpath)
900 elif event_type == MOD:
903 # Local matches the "initial" item so it has not
904 # changed locally and is safe to update.
905 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
906 # Replace contents of local file with new contents
907 local.replace_contents(final)
909 # Overwrite path with new item; this can happen if
910 # path was a file and is now a collection or vice versa
911 self.copy(final, path, overwrite=True)
913 # Local is missing (presumably deleted) or local doesn't
914 # match the "start" value, so save change to conflict file
915 self.copy(final, conflictpath)
916 elif event_type == DEL:
918 # Local item matches "initial" value, so it is safe to remove.
919 self.remove(path, recursive=True)
920 # else, the file is modified or already removed, in either
921 # case we don't want to try to remove it.
923 def portable_data_hash(self):
924 """Get the portable data hash for this collection's manifest."""
925 stripped = self.manifest_text(strip=True)
926 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
929 def __eq__(self, other):
932 if not isinstance(other, SynchronizedCollectionBase):
934 if len(self._items) != len(other):
936 for k in self._items:
939 if self._items[k] != other[k]:
943 def __ne__(self, other):
944 return not self.__eq__(other)
947 class Collection(SynchronizedCollectionBase):
948 """Represents the root of an Arvados Collection.
950 This class is threadsafe. The root collection object, all subcollections
951 and files are protected by a single lock (i.e. each access locks the entire
957 :To read an existing file:
958 `c.open("myfile", "r")`
960 :To write a new file:
961 `c.open("myfile", "w")`
963 :To determine if a file exists:
964 `c.find("myfile") is not None`
967 `c.copy("source", "dest")`
972 :To save to an existing collection record:
975 :To save a new collection record:
978 :To merge remote changes into this object:
981 Must be associated with an API server Collection record (during
982 initialization, or using `save_new`) to use `save` or `update`
986 def __init__(self, manifest_locator_or_text=None,
993 """Collection constructor.
995 :manifest_locator_or_text:
996 One of Arvados collection UUID, block locator of
997 a manifest, raw manifest text, or None (to create an empty collection).
999 the parent Collection, may be None.
1001 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1002 Prefer this over supplying your own api_client and keep_client (except in testing).
1003 Will use default config settings if not specified.
1005 The API client object to use for requests. If not specified, create one using `apiconfig`.
1007 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1009 the number of retries for API and Keep requests.
1011 the block manager to use. If not specified, create one.
1014 super(Collection, self).__init__(parent)
1015 self._api_client = api_client
1016 self._keep_client = keep_client
1017 self._block_manager = block_manager
1020 self._config = apiconfig
1022 self._config = config.settings()
1024 self.num_retries = num_retries if num_retries is not None else 0
1025 self._manifest_locator = None
1026 self._manifest_text = None
1027 self._api_response = None
1029 self.lock = threading.RLock()
1033 if manifest_locator_or_text:
1034 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1035 self._manifest_locator = manifest_locator_or_text
1036 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1037 self._manifest_locator = manifest_locator_or_text
1038 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1039 self._manifest_text = manifest_locator_or_text
1041 raise errors.ArgumentError(
1042 "Argument to CollectionReader must be a manifest or a collection UUID")
1046 except (IOError, errors.SyntaxError) as e:
1047 raise errors.ArgumentError("Error processing manifest text: %s", e)
1049 def root_collection(self):
1052 def stream_name(self):
1060 def update(self, other=None, num_retries=None):
1061 """Merge the latest collection on the API server with the current collection."""
1064 if self._manifest_locator is None:
1065 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1066 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1067 other = CollectionReader(response["manifest_text"])
1068 baseline = CollectionReader(self._manifest_text)
1069 self.apply(baseline.diff(other))
1073 if self._api_client is None:
1074 self._api_client = ThreadSafeApiCache(self._config)
1075 self._keep_client = self._api_client.keep
1076 return self._api_client
1080 if self._keep_client is None:
1081 if self._api_client is None:
1084 self._keep_client = KeepClient(api_client=self._api_client)
1085 return self._keep_client
1088 def _my_block_manager(self):
1089 if self._block_manager is None:
1090 self._block_manager = _BlockManager(self._my_keep())
1091 return self._block_manager
1093 def _populate_from_api_server(self):
1094 # As in KeepClient itself, we must wait until the last
1095 # possible moment to instantiate an API client, in order to
1096 # avoid tripping up clients that don't have access to an API
1097 # server. If we do build one, make sure our Keep client uses
1098 # it. If instantiation fails, we'll fall back to the except
1099 # clause, just like any other Collection lookup
1100 # failure. Return an exception, or None if successful.
1102 self._api_response = self._my_api().collections().get(
1103 uuid=self._manifest_locator).execute(
1104 num_retries=self.num_retries)
1105 self._manifest_text = self._api_response['manifest_text']
1107 except Exception as e:
1110 def _populate_from_keep(self):
1111 # Retrieve a manifest directly from Keep. This has a chance of
1112 # working if [a] the locator includes a permission signature
1113 # or [b] the Keep services are operating in world-readable
1114 # mode. Return an exception, or None if successful.
1116 self._manifest_text = self._my_keep().get(
1117 self._manifest_locator, num_retries=self.num_retries)
1118 except Exception as e:
1121 def _populate(self):
1122 if self._manifest_locator is None and self._manifest_text is None:
1124 error_via_api = None
1125 error_via_keep = None
1126 should_try_keep = ((self._manifest_text is None) and
1127 util.keep_locator_pattern.match(
1128 self._manifest_locator))
1129 if ((self._manifest_text is None) and
1130 util.signed_locator_pattern.match(self._manifest_locator)):
1131 error_via_keep = self._populate_from_keep()
1132 if self._manifest_text is None:
1133 error_via_api = self._populate_from_api_server()
1134 if error_via_api is not None and not should_try_keep:
1136 if ((self._manifest_text is None) and
1137 not error_via_keep and
1139 # Looks like a keep locator, and we didn't already try keep above
1140 error_via_keep = self._populate_from_keep()
1141 if self._manifest_text is None:
1143 raise errors.NotFoundError(
1144 ("Failed to retrieve collection '{}' " +
1145 "from either API server ({}) or Keep ({})."
1147 self._manifest_locator,
1151 self._baseline_manifest = self._manifest_text
1152 self._import_manifest(self._manifest_text)
1155 def _has_collection_uuid(self):
1156 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1158 def __enter__(self):
1161 def __exit__(self, exc_type, exc_value, traceback):
1162 """Support scoped auto-commit in a with: block."""
1163 if exc_type is not None:
1164 if self.writable() and self._has_collection_uuid():
1166 if self._block_manager is not None:
1167 self._block_manager.stop_threads()
1170 def clone(self, new_parent=None, readonly=False, new_config=None):
1171 if new_config is None:
1172 new_config = self._config
1174 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1176 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1178 newcollection._clonefrom(self)
1179 return newcollection
1182 def api_response(self):
1183 """Returns information about this Collection fetched from the API server.
1185 If the Collection exists in Keep but not the API server, currently
1186 returns None. Future versions may provide a synthetic response.
1189 return self._api_response
1191 def find_or_create(self, path, create_type):
1192 """See `SynchronizedCollectionBase.find_or_create`"""
1196 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1198 def find(self, path):
1199 """See `SynchronizedCollectionBase.find`"""
1203 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1205 def remove(self, path, recursive=False):
1206 """See `SynchronizedCollectionBase.remove`"""
1208 raise errors.ArgumentError("Cannot remove '.'")
1210 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1215 def save(self, merge=True, num_retries=None):
1216 """Save collection to an existing collection record.
1218 Commit pending buffer blocks to Keep, merge with remote record (if
1219 update=True), write the manifest to Keep, and update the collection
1222 Will raise AssertionError if not associated with a collection record on
1223 the API server. If you want to save a manifest to Keep only, see
1227 Update and merge remote changes before saving. Otherwise, any
1228 remote changes will be ignored and overwritten.
1232 if not self._has_collection_uuid():
1233 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
1234 self._my_block_manager().commit_all()
1237 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1239 text = self.manifest_text(strip=False)
1240 self._api_response = self._my_api().collections().update(
1241 uuid=self._manifest_locator,
1242 body={'manifest_text': text}
1244 num_retries=num_retries)
1245 self._manifest_text = self._api_response["manifest_text"]
1246 self.set_unmodified()
1252 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1253 """Save collection to a new collection record.
1255 Commit pending buffer blocks to Keep, write the manifest to Keep, and
1256 create a new collection record (if create_collection_record True).
1257 After creating a new collection record, this Collection object will be
1258 associated with the new record used by `save()`.
1261 The collection name.
1264 Only save the manifest to keep, do not create a collection record.
1267 the user, or project uuid that will own this collection.
1268 If None, defaults to the current user.
1270 :ensure_unique_name:
1271 If True, ask the API server to rename the collection
1272 if it conflicts with a collection with the same name and owner. If
1273 False, a name conflict will result in an error.
1276 self._my_block_manager().commit_all()
1277 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1278 text = self.manifest_text(strip=False)
1280 if create_collection_record:
1282 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1284 body = {"manifest_text": text,
1287 body["owner_uuid"] = owner_uuid
1289 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1290 text = self._api_response["manifest_text"]
1292 self._manifest_locator = self._api_response["uuid"]
1294 self._manifest_text = text
1295 self.set_unmodified()
1298 def subscribe(self, callback):
1299 self.callbacks.append(callback)
1302 def unsubscribe(self, callback):
1303 self.callbacks.remove(callback)
1306 def notify(self, event, collection, name, item):
1307 for c in self.callbacks:
1308 c(event, collection, name, item)
1311 def _import_manifest(self, manifest_text):
1312 """Import a manifest into a `Collection`.
1315 The manifest text to import from.
1319 raise ArgumentError("Can only import manifest into an empty collection")
1328 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1329 tok = token_and_separator.group(1)
1330 sep = token_and_separator.group(2)
1332 if state == STREAM_NAME:
1333 # starting a new stream
1334 stream_name = tok.replace('\\040', ' ')
1342 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1344 blocksize = long(block_locator.group(1))
1345 blocks.append(Range(tok, streamoffset, blocksize))
1346 streamoffset += blocksize
1350 if state == SEGMENTS:
1351 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1353 pos = long(file_segment.group(1))
1354 size = long(file_segment.group(2))
1355 name = file_segment.group(3).replace('\\040', ' ')
1356 filepath = os.path.join(stream_name, name)
1357 afile = self.find_or_create(filepath, FILE)
1358 if isinstance(afile, ArvadosFile):
1359 afile.add_segment(blocks, pos, size)
1361 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1364 raise errors.SyntaxError("Invalid manifest format")
1370 self.set_unmodified()
1373 class Subcollection(SynchronizedCollectionBase):
1374 """This is a subdirectory within a collection that doesn't have its own API
1377 It falls under the umbrella of the root collection.
1381 def __init__(self, parent):
1382 super(Subcollection, self).__init__(parent)
1383 self.lock = self.root_collection().lock
1384 self._manifest_text = None
1386 def root_collection(self):
1387 return self.parent.root_collection()
1390 return self.root_collection().writable()
1393 return self.root_collection()._my_api()
1396 return self.root_collection()._my_keep()
1398 def _my_block_manager(self):
1399 return self.root_collection()._my_block_manager()
1401 def notify(self, event, collection, name, item):
1402 return self.root_collection().notify(event, collection, name, item)
1404 def stream_name(self):
1405 for k, v in self.parent.items():
1407 return os.path.join(self.parent.stream_name(), k)
1411 def clone(self, new_parent):
1412 c = Subcollection(new_parent)
1417 class CollectionReader(Collection):
1418 """A read-only collection object.
1420 Initialize from an api collection record locator, a portable data hash of a
1421 manifest, or raw manifest text. See `Collection` constructor for detailed
1425 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1426 self._in_init = True
1427 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1428 self._in_init = False
1430 # Forego any locking since it should never change once initialized.
1431 self.lock = NoopLock()
1433 # Backwards compatability with old CollectionReader
1434 # all_streams() and all_files()
1435 self._streams = None
1438 return self._in_init
1440 def _populate_streams(orig_func):
1441 @functools.wraps(orig_func)
1442 def populate_streams_wrapper(self, *args, **kwargs):
1443 # Defer populating self._streams until needed since it creates a copy of the manifest.
1444 if self._streams is None:
1445 if self._manifest_text:
1446 self._streams = [sline.split()
1447 for sline in self._manifest_text.split("\n")
1451 return orig_func(self, *args, **kwargs)
1452 return populate_streams_wrapper
1455 def normalize(self):
1456 """Normalize the streams returned by `all_streams`.
1458 This method is kept for backwards compatability and only affects the
1459 behavior of `all_streams()` and `all_files()`
1465 for s in self.all_streams():
1466 for f in s.all_files():
1467 streamname, filename = split(s.name() + "/" + f.name())
1468 if streamname not in streams:
1469 streams[streamname] = {}
1470 if filename not in streams[streamname]:
1471 streams[streamname][filename] = []
1472 for r in f.segments:
1473 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1475 self._streams = [normalize_stream(s, streams[s])
1476 for s in sorted(streams)]
1478 def all_streams(self):
1479 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1480 for s in self._streams]
1483 def all_files(self):
1484 for s in self.all_streams():
1485 for f in s.all_files():