10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
41 """Get the manifest with locator hints stripped.
43 Return the manifest for the current collection with all
44 non-portable hints (i.e., permission signatures and other
45 hints other than size hints) removed from the locators.
47 raw = self.manifest_text()
49 for line in raw.split("\n"):
52 clean_fields = fields[:1] + [
53 (re.sub(r'\+[^\d][^\+]*', '', x)
54 if re.match(util.keep_locator_pattern, x)
57 clean += [' '.join(clean_fields), "\n"]
61 class _WriterFile(_FileLikeObjectBase):
62 def __init__(self, coll_writer, name):
63 super(_WriterFile, self).__init__(name, 'wb')
64 self.dest = coll_writer
67 super(_WriterFile, self).close()
68 self.dest.finish_current_file()
70 @_FileLikeObjectBase._before_close
71 def write(self, data):
74 @_FileLikeObjectBase._before_close
75 def writelines(self, seq):
79 @_FileLikeObjectBase._before_close
81 self.dest.flush_data()
84 class CollectionWriter(CollectionBase):
85 def __init__(self, api_client=None, num_retries=0, replication=None):
86 """Instantiate a CollectionWriter.
88 CollectionWriter lets you build a new Arvados Collection from scratch.
89 Write files to it. The CollectionWriter will upload data to Keep as
90 appropriate, and provide you with the Collection manifest text when
94 * api_client: The API client to use to look up Collections. If not
95 provided, CollectionReader will build one from available Arvados
97 * num_retries: The default number of times to retry failed
98 service requests. Default 0. You may change this value
99 after instantiation, but note those changes may not
100 propagate to related objects like the Keep client.
101 * replication: The number of copies of each block to store.
102 If this argument is None or not supplied, replication is
103 the server-provided default if available, otherwise 2.
105 self._api_client = api_client
106 self.num_retries = num_retries
107 self.replication = (2 if replication is None else replication)
108 self._keep_client = None
109 self._data_buffer = []
110 self._data_buffer_len = 0
111 self._current_stream_files = []
112 self._current_stream_length = 0
113 self._current_stream_locators = []
114 self._current_stream_name = '.'
115 self._current_file_name = None
116 self._current_file_pos = 0
117 self._finished_streams = []
118 self._close_file = None
119 self._queued_file = None
120 self._queued_dirents = deque()
121 self._queued_trees = deque()
122 self._last_open = None
124 def __exit__(self, exc_type, exc_value, traceback):
128 def do_queued_work(self):
129 # The work queue consists of three pieces:
130 # * _queued_file: The file object we're currently writing to the
132 # * _queued_dirents: Entries under the current directory
133 # (_queued_trees[0]) that we want to write or recurse through.
134 # This may contain files from subdirectories if
135 # max_manifest_depth == 0 for this directory.
136 # * _queued_trees: Directories that should be written as separate
137 # streams to the Collection.
138 # This function handles the smallest piece of work currently queued
139 # (current file, then current directory, then next directory) until
140 # no work remains. The _work_THING methods each do a unit of work on
141 # THING. _queue_THING methods add a THING to the work queue.
143 if self._queued_file:
145 elif self._queued_dirents:
147 elif self._queued_trees:
152 def _work_file(self):
154 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
158 self.finish_current_file()
160 self._queued_file.close()
161 self._close_file = None
162 self._queued_file = None
164 def _work_dirents(self):
165 path, stream_name, max_manifest_depth = self._queued_trees[0]
166 if stream_name != self.current_stream_name():
167 self.start_new_stream(stream_name)
168 while self._queued_dirents:
169 dirent = self._queued_dirents.popleft()
170 target = os.path.join(path, dirent)
171 if os.path.isdir(target):
172 self._queue_tree(target,
173 os.path.join(stream_name, dirent),
174 max_manifest_depth - 1)
176 self._queue_file(target, dirent)
178 if not self._queued_dirents:
179 self._queued_trees.popleft()
181 def _work_trees(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 d = util.listdir_recursive(
184 path, max_depth = (None if max_manifest_depth == 0 else 0))
186 self._queue_dirents(stream_name, d)
188 self._queued_trees.popleft()
190 def _queue_file(self, source, filename=None):
191 assert (self._queued_file is None), "tried to queue more than one file"
192 if not hasattr(source, 'read'):
193 source = open(source, 'rb')
194 self._close_file = True
196 self._close_file = False
198 filename = os.path.basename(source.name)
199 self.start_new_file(filename)
200 self._queued_file = source
202 def _queue_dirents(self, stream_name, dirents):
203 assert (not self._queued_dirents), "tried to queue more than one tree"
204 self._queued_dirents = deque(sorted(dirents))
206 def _queue_tree(self, path, stream_name, max_manifest_depth):
207 self._queued_trees.append((path, stream_name, max_manifest_depth))
209 def write_file(self, source, filename=None):
210 self._queue_file(source, filename)
211 self.do_queued_work()
213 def write_directory_tree(self,
214 path, stream_name='.', max_manifest_depth=-1):
215 self._queue_tree(path, stream_name, max_manifest_depth)
216 self.do_queued_work()
218 def write(self, newdata):
219 if hasattr(newdata, '__iter__'):
223 self._data_buffer.append(newdata)
224 self._data_buffer_len += len(newdata)
225 self._current_stream_length += len(newdata)
226 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
229 def open(self, streampath, filename=None):
230 """open(streampath[, filename]) -> file-like object
232 Pass in the path of a file to write to the Collection, either as a
233 single string or as two separate stream name and file name arguments.
234 This method returns a file-like object you can write to add it to the
237 You may only have one file object from the Collection open at a time,
238 so be sure to close the object when you're done. Using the object in
239 a with statement makes that easy::
241 with cwriter.open('./doc/page1.txt') as outfile:
242 outfile.write(page1_data)
243 with cwriter.open('./doc/page2.txt') as outfile:
244 outfile.write(page2_data)
247 streampath, filename = split(streampath)
248 if self._last_open and not self._last_open.closed:
249 raise errors.AssertionError(
250 "can't open '{}' when '{}' is still open".format(
251 filename, self._last_open.name))
252 if streampath != self.current_stream_name():
253 self.start_new_stream(streampath)
254 self.set_current_file_name(filename)
255 self._last_open = _WriterFile(self, filename)
256 return self._last_open
258 def flush_data(self):
259 data_buffer = ''.join(self._data_buffer)
261 self._current_stream_locators.append(
263 data_buffer[0:config.KEEP_BLOCK_SIZE],
264 copies=self.replication))
265 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266 self._data_buffer_len = len(self._data_buffer[0])
268 def start_new_file(self, newfilename=None):
269 self.finish_current_file()
270 self.set_current_file_name(newfilename)
272 def set_current_file_name(self, newfilename):
273 if re.search(r'[\t\n]', newfilename):
274 raise errors.AssertionError(
275 "Manifest filenames cannot contain whitespace: %s" %
277 elif re.search(r'\x00', newfilename):
278 raise errors.AssertionError(
279 "Manifest filenames cannot contain NUL characters: %s" %
281 self._current_file_name = newfilename
283 def current_file_name(self):
284 return self._current_file_name
286 def finish_current_file(self):
287 if self._current_file_name is None:
288 if self._current_file_pos == self._current_stream_length:
290 raise errors.AssertionError(
291 "Cannot finish an unnamed file " +
292 "(%d bytes at offset %d in '%s' stream)" %
293 (self._current_stream_length - self._current_file_pos,
294 self._current_file_pos,
295 self._current_stream_name))
296 self._current_stream_files.append([
297 self._current_file_pos,
298 self._current_stream_length - self._current_file_pos,
299 self._current_file_name])
300 self._current_file_pos = self._current_stream_length
301 self._current_file_name = None
303 def start_new_stream(self, newstreamname='.'):
304 self.finish_current_stream()
305 self.set_current_stream_name(newstreamname)
307 def set_current_stream_name(self, newstreamname):
308 if re.search(r'[\t\n]', newstreamname):
309 raise errors.AssertionError(
310 "Manifest stream names cannot contain whitespace")
311 self._current_stream_name = '.' if newstreamname=='' else newstreamname
313 def current_stream_name(self):
314 return self._current_stream_name
316 def finish_current_stream(self):
317 self.finish_current_file()
319 if not self._current_stream_files:
321 elif self._current_stream_name is None:
322 raise errors.AssertionError(
323 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324 (self._current_stream_length, len(self._current_stream_files)))
326 if not self._current_stream_locators:
327 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328 self._finished_streams.append([self._current_stream_name,
329 self._current_stream_locators,
330 self._current_stream_files])
331 self._current_stream_files = []
332 self._current_stream_length = 0
333 self._current_stream_locators = []
334 self._current_stream_name = None
335 self._current_file_pos = 0
336 self._current_file_name = None
339 """Store the manifest in Keep and return its locator.
341 This is useful for storing manifest fragments (task outputs)
342 temporarily in Keep during a Crunch job.
344 In other cases you should make a collection instead, by
345 sending manifest_text() to the API server's "create
346 collection" endpoint.
348 return self._my_keep().put(self.manifest_text(), copies=self.replication)
350 def portable_data_hash(self):
351 stripped = self.stripped_manifest()
352 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354 def manifest_text(self):
355 self.finish_current_stream()
358 for stream in self._finished_streams:
359 if not re.search(r'^\.(/.*)?$', stream[0]):
361 manifest += stream[0].replace(' ', '\\040')
362 manifest += ' ' + ' '.join(stream[1])
363 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
368 def data_locators(self):
370 for name, locators, files in self._finished_streams:
375 class ResumableCollectionWriter(CollectionWriter):
376 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377 '_current_stream_locators', '_current_stream_name',
378 '_current_file_name', '_current_file_pos', '_close_file',
379 '_data_buffer', '_dependencies', '_finished_streams',
380 '_queued_dirents', '_queued_trees']
382 def __init__(self, api_client=None, **kwargs):
383 self._dependencies = {}
384 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387 def from_state(cls, state, *init_args, **init_kwargs):
388 # Try to build a new writer from scratch with the given state.
389 # If the state is not suitable to resume (because files have changed,
390 # been deleted, aren't predictable, etc.), raise a
391 # StaleWriterStateError. Otherwise, return the initialized writer.
392 # The caller is responsible for calling writer.do_queued_work()
393 # appropriately after it's returned.
394 writer = cls(*init_args, **init_kwargs)
395 for attr_name in cls.STATE_PROPS:
396 attr_value = state[attr_name]
397 attr_class = getattr(writer, attr_name).__class__
398 # Coerce the value into the same type as the initial value, if
400 if attr_class not in (type(None), attr_value.__class__):
401 attr_value = attr_class(attr_value)
402 setattr(writer, attr_name, attr_value)
403 # Check dependencies before we try to resume anything.
404 if any(KeepLocator(ls).permission_expired()
405 for ls in writer._current_stream_locators):
406 raise errors.StaleWriterStateError(
407 "locators include expired permission hint")
408 writer.check_dependencies()
409 if state['_current_file'] is not None:
410 path, pos = state['_current_file']
412 writer._queued_file = open(path, 'rb')
413 writer._queued_file.seek(pos)
414 except IOError as error:
415 raise errors.StaleWriterStateError(
416 "failed to reopen active file {}: {}".format(path, error))
419 def check_dependencies(self):
420 for path, orig_stat in self._dependencies.items():
421 if not S_ISREG(orig_stat[ST_MODE]):
422 raise errors.StaleWriterStateError("{} not file".format(path))
424 now_stat = tuple(os.stat(path))
425 except OSError as error:
426 raise errors.StaleWriterStateError(
427 "failed to stat {}: {}".format(path, error))
428 if ((not S_ISREG(now_stat[ST_MODE])) or
429 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431 raise errors.StaleWriterStateError("{} changed".format(path))
433 def dump_state(self, copy_func=lambda x: x):
434 state = {attr: copy_func(getattr(self, attr))
435 for attr in self.STATE_PROPS}
436 if self._queued_file is None:
437 state['_current_file'] = None
439 state['_current_file'] = (os.path.realpath(self._queued_file.name),
440 self._queued_file.tell())
443 def _queue_file(self, source, filename=None):
445 src_path = os.path.realpath(source)
447 raise errors.AssertionError("{} not a file path".format(source))
449 path_stat = os.stat(src_path)
450 except OSError as stat_error:
452 super(ResumableCollectionWriter, self)._queue_file(source, filename)
453 fd_stat = os.fstat(self._queued_file.fileno())
454 if not S_ISREG(fd_stat.st_mode):
455 # We won't be able to resume from this cache anyway, so don't
456 # worry about further checks.
457 self._dependencies[source] = tuple(fd_stat)
458 elif path_stat is None:
459 raise errors.AssertionError(
460 "could not stat {}: {}".format(source, stat_error))
461 elif path_stat.st_ino != fd_stat.st_ino:
462 raise errors.AssertionError(
463 "{} changed between open and stat calls".format(source))
465 self._dependencies[src_path] = tuple(fd_stat)
467 def write(self, data):
468 if self._queued_file is None:
469 raise errors.AssertionError(
470 "resumable writer can't accept unsourced data")
471 return super(ResumableCollectionWriter, self).write(data)
478 COLLECTION = "collection"
480 class RichCollectionBase(CollectionBase):
481 """Base class for Collections and Subcollections.
483 Implements the majority of functionality relating to accessing items in the
488 def __init__(self, parent=None):
490 self._modified = True
491 self._callback = None
495 raise NotImplementedError()
498 raise NotImplementedError()
500 def _my_block_manager(self):
501 raise NotImplementedError()
504 raise NotImplementedError()
506 def root_collection(self):
507 raise NotImplementedError()
509 def notify(self, event, collection, name, item):
510 raise NotImplementedError()
512 def stream_name(self):
513 raise NotImplementedError()
517 def find_or_create(self, path, create_type):
518 """Recursively search the specified file path.
520 May return either a `Collection` or `ArvadosFile`. If not found, will
521 create a new item at the specified path based on `create_type`. Will
522 create intermediate subcollections needed to contain the final item in
526 One of `arvados.collection.FILE` or
527 `arvados.collection.COLLECTION`. If the path is not found, and value
528 of create_type is FILE then create and return a new ArvadosFile for
529 the last path component. If COLLECTION, then create and return a new
530 Collection for the last path component.
534 pathcomponents = path.split("/", 1)
535 if pathcomponents[0]:
536 item = self._items.get(pathcomponents[0])
537 if len(pathcomponents) == 1:
540 if create_type == COLLECTION:
541 item = Subcollection(self, pathcomponents[0])
543 item = ArvadosFile(self, pathcomponents[0])
544 self._items[pathcomponents[0]] = item
545 self._modified = True
546 self.notify(ADD, self, pathcomponents[0], item)
550 # create new collection
551 item = Subcollection(self, pathcomponents[0])
552 self._items[pathcomponents[0]] = item
553 self._modified = True
554 self.notify(ADD, self, pathcomponents[0], item)
555 if isinstance(item, RichCollectionBase):
556 return item.find_or_create(pathcomponents[1], create_type)
558 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
563 def find(self, path):
564 """Recursively search the specified file path.
566 May return either a Collection or ArvadosFile. Return None if not
571 raise errors.ArgumentError("Parameter 'path' must not be empty.")
573 pathcomponents = path.split("/", 1)
574 item = self._items.get(pathcomponents[0])
575 if len(pathcomponents) == 1:
578 if isinstance(item, RichCollectionBase):
579 if pathcomponents[1]:
580 return item.find(pathcomponents[1])
584 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
587 """Recursive subcollection create.
589 Like `os.mkdirs()`. Will create intermediate subcollections needed to
590 contain the leaf subcollection path.
593 return self.find_or_create(path, COLLECTION)
595 def open(self, path, mode="r"):
596 """Open a file-like object for access.
599 path to a file in the collection
601 one of "r", "r+", "w", "w+", "a", "a+"
605 opens for reading and writing. Reads/writes share a file pointer.
607 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
609 opens for reading and writing. All writes are appended to
610 the end of the file. Writing does not affect the file pointer for
613 mode = mode.replace("b", "")
614 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
615 raise errors.ArgumentError("Bad mode '%s'" % mode)
616 create = (mode != "r")
618 if create and not self.writable():
619 raise IOError((errno.EROFS, "Collection is read only"))
622 arvfile = self.find_or_create(path, FILE)
624 arvfile = self.find(path)
627 raise IOError((errno.ENOENT, "File not found"))
628 if not isinstance(arvfile, ArvadosFile):
629 raise IOError((errno.EISDIR, "Path must refer to a file."))
634 name = os.path.basename(path)
637 return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
639 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
643 """Test if the collection (or any subcollection or file) has been modified."""
646 for k,v in self._items.items():
652 def set_unmodified(self):
653 """Recursively clear modified flag."""
654 self._modified = False
655 for k,v in self._items.items():
660 """Iterate over names of files and collections contained in this collection."""
661 return iter(self._items.keys())
664 def __getitem__(self, k):
665 """Get a file or collection that is directly contained by this collection.
667 If you want to search a path, use `find()` instead.
670 return self._items[k]
673 def __contains__(self, k):
674 """Test if there is a file or collection a directly contained by this collection."""
675 return k in self._items
679 """Get the number of items directly contained in this collection."""
680 return len(self._items)
684 def __delitem__(self, p):
685 """Delete an item by name which is directly contained by this collection."""
687 self._modified = True
688 self.notify(DEL, self, p, None)
692 """Get a list of names of files and collections directly contained in this collection."""
693 return self._items.keys()
697 """Get a list of files and collection objects directly contained in this collection."""
698 return self._items.values()
702 """Get a list of (name, object) tuples directly contained in this collection."""
703 return self._items.items()
705 def exists(self, path):
706 """Test if there is a file or collection at `path`."""
707 return self.find(path) is not None
711 def remove(self, path, recursive=False):
712 """Remove the file or subcollection (directory) at `path`.
715 Specify whether to remove non-empty subcollections (True), or raise an error (False).
719 raise errors.ArgumentError("Parameter 'path' must not be empty.")
721 pathcomponents = path.split("/", 1)
722 item = self._items.get(pathcomponents[0])
724 raise IOError((errno.ENOENT, "File not found"))
725 if len(pathcomponents) == 1:
726 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
727 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
728 deleteditem = self._items[pathcomponents[0]]
729 del self._items[pathcomponents[0]]
730 self._modified = True
731 self.notify(DEL, self, pathcomponents[0], deleteditem)
733 item.remove(pathcomponents[1])
735 def _clonefrom(self, source):
736 for k,v in source.items():
737 self._items[k] = v.clone(self, k)
740 raise NotImplementedError()
744 def add(self, source_obj, target_name, overwrite=False):
745 """Copy a file or subcollection to this collection.
748 An ArvadosFile, or Subcollection object
751 Destination item name. If the target name already exists and is a
752 file, this will raise an error unless you specify `overwrite=True`.
755 Whether to overwrite target file if it already exists.
759 if target_name in self and not overwrite:
760 raise IOError((errno.EEXIST, "File already exists"))
763 if target_name in self:
764 modified_from = self[target_name]
766 # Actually make the copy.
767 dup = source_obj.clone(self, target_name)
768 self._items[target_name] = dup
769 self._modified = True
772 self.notify(MOD, self, target_name, (modified_from, dup))
774 self.notify(ADD, self, target_name, dup)
778 def copy(self, source, target_path, source_collection=None, overwrite=False):
779 """Copy a file or subcollection to a new path in this collection.
782 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
785 Destination file or path. If the target path already exists and is a
786 subcollection, the item will be placed inside the subcollection. If
787 the target path already exists and is a file, this will raise an error
788 unless you specify `overwrite=True`.
791 Collection to copy `source_path` from (default `self`)
794 Whether to overwrite target file if it already exists.
796 if source_collection is None:
797 source_collection = self
799 # Find the object to copy
800 if isinstance(source, basestring):
801 source_obj = source_collection.find(source)
802 if source_obj is None:
803 raise IOError((errno.ENOENT, "File not found"))
804 sourcecomponents = source.split("/")
807 sourcecomponents = None
809 # Find parent collection the target path
810 targetcomponents = target_path.split("/")
812 # Determine the name to use.
813 target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
816 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
818 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
820 if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
821 target_dir = target_dir[target_name]
822 target_name = sourcecomponents[-1]
824 target_dir.add(source_obj, target_name, overwrite)
827 def manifest_text(self, stream_name=".", strip=False, normalize=False):
828 """Get the manifest text for this collection, sub collections and files.
831 Name of the stream (directory)
834 If True, remove signing tokens from block locators if present.
835 If False (default), block locators are left unchanged.
838 If True, always export the manifest text in normalized form
839 even if the Collection is not modified. If False (default) and the collection
840 is not modified, return the original manifest text even if it is not
845 if self.modified() or self._manifest_text is None or normalize:
848 sorted_keys = sorted(self.keys())
849 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
850 # Create a stream per file `k`
851 arvfile = self[filename]
853 for segment in arvfile.segments():
854 loc = segment.locator
855 if arvfile.parent._my_block_manager().is_bufferblock(loc):
856 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
858 loc = KeepLocator(loc).stripped()
859 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
860 segment.segment_offset, segment.range_size))
861 stream[filename] = filestream
863 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
864 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
865 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
869 return self.stripped_manifest()
871 return self._manifest_text
874 def diff(self, end_collection, prefix=".", holding_collection=None):
875 """Generate list of add/modify/delete actions.
877 When given to `apply`, will change `self` to match `end_collection`
881 if holding_collection is None:
882 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
884 if k not in end_collection:
885 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
886 for k in end_collection:
888 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
889 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
890 elif end_collection[k] != self[k]:
891 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
893 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
898 def apply(self, changes):
899 """Apply changes from `diff`.
901 If a change conflicts with a local change, it will be saved to an
902 alternate path indicating the conflict.
905 for change in changes:
906 event_type = change[0]
909 local = self.find(path)
910 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
912 if event_type == ADD:
914 # No local file at path, safe to copy over new file
915 self.copy(initial, path)
916 elif local is not None and local != initial:
917 # There is already local file and it is different:
918 # save change to conflict file.
919 self.copy(initial, conflictpath)
920 elif event_type == MOD:
923 # Local matches the "initial" item so it has not
924 # changed locally and is safe to update.
925 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
926 # Replace contents of local file with new contents
927 local.replace_contents(final)
929 # Overwrite path with new item; this can happen if
930 # path was a file and is now a collection or vice versa
931 self.copy(final, path, overwrite=True)
933 # Local is missing (presumably deleted) or local doesn't
934 # match the "start" value, so save change to conflict file
935 self.copy(final, conflictpath)
936 elif event_type == DEL:
938 # Local item matches "initial" value, so it is safe to remove.
939 self.remove(path, recursive=True)
940 # else, the file is modified or already removed, in either
941 # case we don't want to try to remove it.
943 def portable_data_hash(self):
944 """Get the portable data hash for this collection's manifest."""
945 stripped = self.manifest_text(strip=True)
946 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
949 def subscribe(self, callback):
950 if self._callback is None:
951 self._callback = callback
953 raise errors.ArgumentError("A callback is already set on this collection.")
956 def unsubscribe(self):
957 if self._callback is not None:
958 self._callback = None
961 def notify(self, event, collection, name, item):
963 self._callback(event, collection, name, item)
964 self.root_collection().notify(event, collection, name, item)
967 def __eq__(self, other):
970 if not isinstance(other, RichCollectionBase):
972 if len(self._items) != len(other):
974 for k in self._items:
977 if self._items[k] != other[k]:
981 def __ne__(self, other):
982 return not self.__eq__(other)
985 class Collection(RichCollectionBase):
986 """Represents the root of an Arvados Collection.
988 This class is threadsafe. The root collection object, all subcollections
989 and files are protected by a single lock (i.e. each access locks the entire
995 :To read an existing file:
996 `c.open("myfile", "r")`
998 :To write a new file:
999 `c.open("myfile", "w")`
1001 :To determine if a file exists:
1002 `c.find("myfile") is not None`
1005 `c.copy("source", "dest")`
1008 `c.remove("myfile")`
1010 :To save to an existing collection record:
1013 :To save a new collection record:
1016 :To merge remote changes into this object:
1019 Must be associated with an API server Collection record (during
1020 initialization, or using `save_new`) to use `save` or `update`
1024 def __init__(self, manifest_locator_or_text=None,
1030 block_manager=None):
1031 """Collection constructor.
1033 :manifest_locator_or_text:
1034 One of Arvados collection UUID, block locator of
1035 a manifest, raw manifest text, or None (to create an empty collection).
1037 the parent Collection, may be None.
1039 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1040 Prefer this over supplying your own api_client and keep_client (except in testing).
1041 Will use default config settings if not specified.
1043 The API client object to use for requests. If not specified, create one using `apiconfig`.
1045 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1047 the number of retries for API and Keep requests.
1049 the block manager to use. If not specified, create one.
1052 super(Collection, self).__init__(parent)
1053 self._api_client = api_client
1054 self._keep_client = keep_client
1055 self._block_manager = block_manager
1058 self._config = apiconfig
1060 self._config = config.settings()
1062 self.num_retries = num_retries if num_retries is not None else 0
1063 self._manifest_locator = None
1064 self._manifest_text = None
1065 self._api_response = None
1067 self.lock = threading.RLock()
1070 if manifest_locator_or_text:
1071 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1072 self._manifest_locator = manifest_locator_or_text
1073 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1074 self._manifest_locator = manifest_locator_or_text
1075 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1076 self._manifest_text = manifest_locator_or_text
1078 raise errors.ArgumentError(
1079 "Argument to CollectionReader must be a manifest or a collection UUID")
1083 except (IOError, errors.SyntaxError) as e:
1084 raise errors.ArgumentError("Error processing manifest text: %s", e)
1086 def root_collection(self):
1089 def stream_name(self):
1097 def update(self, other=None, num_retries=None):
1098 """Merge the latest collection on the API server with the current collection."""
1101 if self._manifest_locator is None:
1102 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1103 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1104 other = CollectionReader(response["manifest_text"])
1105 baseline = CollectionReader(self._manifest_text)
1106 self.apply(baseline.diff(other))
1110 if self._api_client is None:
1111 self._api_client = ThreadSafeApiCache(self._config)
1112 self._keep_client = self._api_client.keep
1113 return self._api_client
1117 if self._keep_client is None:
1118 if self._api_client is None:
1121 self._keep_client = KeepClient(api_client=self._api_client)
1122 return self._keep_client
1125 def _my_block_manager(self):
1126 if self._block_manager is None:
1127 self._block_manager = _BlockManager(self._my_keep())
1128 return self._block_manager
1130 def _populate_from_api_server(self):
1131 # As in KeepClient itself, we must wait until the last
1132 # possible moment to instantiate an API client, in order to
1133 # avoid tripping up clients that don't have access to an API
1134 # server. If we do build one, make sure our Keep client uses
1135 # it. If instantiation fails, we'll fall back to the except
1136 # clause, just like any other Collection lookup
1137 # failure. Return an exception, or None if successful.
1139 self._api_response = self._my_api().collections().get(
1140 uuid=self._manifest_locator).execute(
1141 num_retries=self.num_retries)
1142 self._manifest_text = self._api_response['manifest_text']
1144 except Exception as e:
1147 def _populate_from_keep(self):
1148 # Retrieve a manifest directly from Keep. This has a chance of
1149 # working if [a] the locator includes a permission signature
1150 # or [b] the Keep services are operating in world-readable
1151 # mode. Return an exception, or None if successful.
1153 self._manifest_text = self._my_keep().get(
1154 self._manifest_locator, num_retries=self.num_retries)
1155 except Exception as e:
1158 def _populate(self):
1159 if self._manifest_locator is None and self._manifest_text is None:
1161 error_via_api = None
1162 error_via_keep = None
1163 should_try_keep = ((self._manifest_text is None) and
1164 util.keep_locator_pattern.match(
1165 self._manifest_locator))
1166 if ((self._manifest_text is None) and
1167 util.signed_locator_pattern.match(self._manifest_locator)):
1168 error_via_keep = self._populate_from_keep()
1169 if self._manifest_text is None:
1170 error_via_api = self._populate_from_api_server()
1171 if error_via_api is not None and not should_try_keep:
1173 if ((self._manifest_text is None) and
1174 not error_via_keep and
1176 # Looks like a keep locator, and we didn't already try keep above
1177 error_via_keep = self._populate_from_keep()
1178 if self._manifest_text is None:
1180 raise errors.NotFoundError(
1181 ("Failed to retrieve collection '{}' " +
1182 "from either API server ({}) or Keep ({})."
1184 self._manifest_locator,
1188 self._baseline_manifest = self._manifest_text
1189 self._import_manifest(self._manifest_text)
1192 def _has_collection_uuid(self):
1193 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1195 def __enter__(self):
1198 def __exit__(self, exc_type, exc_value, traceback):
1199 """Support scoped auto-commit in a with: block."""
1200 if exc_type is not None:
1201 if self.writable() and self._has_collection_uuid():
1203 if self._block_manager is not None:
1204 self._block_manager.stop_threads()
1207 def manifest_locator(self):
1208 """Get the manifest locator, if any.
1210 The manifest locator will be set when the collection is loaded from an
1211 API server record or the portable data hash of a manifest.
1213 The manifest locator will be None if the collection is newly created or
1214 was created directly from manifest text. The method `save_new()` will
1215 assign a manifest locator.
1218 return self._manifest_locator
1221 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1222 if new_config is None:
1223 new_config = self._config
1225 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1227 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1229 newcollection._clonefrom(self)
1230 return newcollection
1233 def api_response(self):
1234 """Returns information about this Collection fetched from the API server.
1236 If the Collection exists in Keep but not the API server, currently
1237 returns None. Future versions may provide a synthetic response.
1240 return self._api_response
1242 def find_or_create(self, path, create_type):
1243 """See `RichCollectionBase.find_or_create`"""
1247 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1249 def find(self, path):
1250 """See `RichCollectionBase.find`"""
1254 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1256 def remove(self, path, recursive=False):
1257 """See `RichCollectionBase.remove`"""
1259 raise errors.ArgumentError("Cannot remove '.'")
1261 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1266 def save(self, merge=True, num_retries=None):
1267 """Save collection to an existing collection record.
1269 Commit pending buffer blocks to Keep, merge with remote record (if
1270 merge=True, the default), write the manifest to Keep, and update the
1273 Will raise AssertionError if not associated with a collection record on
1274 the API server. If you want to save a manifest to Keep only, see
1278 Update and merge remote changes before saving. Otherwise, any
1279 remote changes will be ignored and overwritten.
1282 Retry count on API calls (if None, use the collection default)
1286 if not self._has_collection_uuid():
1287 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
1288 self._my_block_manager().commit_all()
1291 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1293 text = self.manifest_text(strip=False)
1294 self._api_response = self._my_api().collections().update(
1295 uuid=self._manifest_locator,
1296 body={'manifest_text': text}
1298 num_retries=num_retries)
1299 self._manifest_text = self._api_response["manifest_text"]
1300 self.set_unmodified()
1306 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1307 """Save collection to a new collection record.
1309 Commit pending buffer blocks to Keep, write the manifest to Keep, and
1310 create a new collection record (if create_collection_record True).
1311 After creating a new collection record, this Collection object will be
1312 associated with the new record used by `save()`.
1315 The collection name.
1317 :create_collection_record:
1318 If True, create a collection record. If False, only save the manifest to keep.
1321 the user, or project uuid that will own this collection.
1322 If None, defaults to the current user.
1324 :ensure_unique_name:
1325 If True, ask the API server to rename the collection
1326 if it conflicts with a collection with the same name and owner. If
1327 False, a name conflict will result in an error.
1330 Retry count on API calls (if None, use the collection default)
1333 self._my_block_manager().commit_all()
1334 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1335 text = self.manifest_text(strip=False)
1337 if create_collection_record:
1339 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1341 body = {"manifest_text": text,
1344 body["owner_uuid"] = owner_uuid
1346 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1347 text = self._api_response["manifest_text"]
1349 self._manifest_locator = self._api_response["uuid"]
1351 self._manifest_text = text
1352 self.set_unmodified()
1355 def _import_manifest(self, manifest_text):
1356 """Import a manifest into a `Collection`.
1359 The manifest text to import from.
1363 raise ArgumentError("Can only import manifest into an empty collection")
1372 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1373 tok = token_and_separator.group(1)
1374 sep = token_and_separator.group(2)
1376 if state == STREAM_NAME:
1377 # starting a new stream
1378 stream_name = tok.replace('\\040', ' ')
1386 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1388 blocksize = long(block_locator.group(1))
1389 blocks.append(Range(tok, streamoffset, blocksize, 0))
1390 streamoffset += blocksize
1394 if state == SEGMENTS:
1395 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1397 pos = long(file_segment.group(1))
1398 size = long(file_segment.group(2))
1399 name = file_segment.group(3).replace('\\040', ' ')
1400 filepath = os.path.join(stream_name, name)
1401 afile = self.find_or_create(filepath, FILE)
1402 if isinstance(afile, ArvadosFile):
1403 afile.add_segment(blocks, pos, size)
1405 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1408 raise errors.SyntaxError("Invalid manifest format")
1414 self.set_unmodified()
1417 def notify(self, event, collection, name, item):
1419 self._callback(event, collection, name, item)
1422 class Subcollection(RichCollectionBase):
1423 """This is a subdirectory within a collection that doesn't have its own API
1426 It falls under the umbrella of the root collection.
1430 def __init__(self, parent, name):
1431 super(Subcollection, self).__init__(parent)
1432 self.lock = self.root_collection().lock
1433 self._manifest_text = None
1436 def root_collection(self):
1437 return self.parent.root_collection()
1440 return self.root_collection().writable()
1443 return self.root_collection()._my_api()
1446 return self.root_collection()._my_keep()
1448 def _my_block_manager(self):
1449 return self.root_collection()._my_block_manager()
1451 def stream_name(self):
1452 return os.path.join(self.parent.stream_name(), self.name)
1455 def clone(self, new_parent, new_name):
1456 c = Subcollection(new_parent, new_name)
1461 class CollectionReader(Collection):
1462 """A read-only collection object.
1464 Initialize from an api collection record locator, a portable data hash of a
1465 manifest, or raw manifest text. See `Collection` constructor for detailed
1469 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1470 self._in_init = True
1471 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1472 self._in_init = False
1474 # Forego any locking since it should never change once initialized.
1475 self.lock = NoopLock()
1477 # Backwards compatability with old CollectionReader
1478 # all_streams() and all_files()
1479 self._streams = None
1482 return self._in_init
1484 def _populate_streams(orig_func):
1485 @functools.wraps(orig_func)
1486 def populate_streams_wrapper(self, *args, **kwargs):
1487 # Defer populating self._streams until needed since it creates a copy of the manifest.
1488 if self._streams is None:
1489 if self._manifest_text:
1490 self._streams = [sline.split()
1491 for sline in self._manifest_text.split("\n")
1495 return orig_func(self, *args, **kwargs)
1496 return populate_streams_wrapper
1499 def normalize(self):
1500 """Normalize the streams returned by `all_streams`.
1502 This method is kept for backwards compatability and only affects the
1503 behavior of `all_streams()` and `all_files()`
1509 for s in self.all_streams():
1510 for f in s.all_files():
1511 streamname, filename = split(s.name() + "/" + f.name())
1512 if streamname not in streams:
1513 streams[streamname] = {}
1514 if filename not in streams[streamname]:
1515 streams[streamname][filename] = []
1516 for r in f.segments:
1517 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1519 self._streams = [normalize_stream(s, streams[s])
1520 for s in sorted(streams)]
1522 def all_streams(self):
1523 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1524 for s in self._streams]
1527 def all_files(self):
1528 for s in self.all_streams():
1529 for f in s.all_files():