X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/31d76600cdb691251d0823cc6be601d958b4e1a4..3b12ef6b6d7ff6852f6109ab71dbec382322a686:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index f735b9ed37..77312e4d49 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1,3 +1,8 @@ +from __future__ import absolute_import +from future.utils import listitems, listvalues, viewkeys +from builtins import str +from past.builtins import basestring +from builtins import object import functools import logging import os @@ -11,15 +16,15 @@ from collections import deque from stat import * from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock -from keep import KeepLocator, KeepClient +from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream from ._ranges import Range, LocatorAndRange from .safeapi import ThreadSafeApiCache -import config -import errors -import util -import events +import arvados.config as config +import arvados.errors as errors +import arvados.util +import arvados.events as events from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') @@ -51,7 +56,7 @@ class CollectionBase(object): if fields: clean_fields = fields[:1] + [ (re.sub(r'\+[^\d][^\+]*', '', x) - if re.match(util.keep_locator_pattern, x) + if re.match(arvados.util.keep_locator_pattern, x) else x) for x in fields[1:]] clean += [' '.join(clean_fields), "\n"] @@ -180,7 +185,7 @@ class CollectionWriter(CollectionBase): def _work_trees(self): path, stream_name, max_manifest_depth = self._queued_trees[0] - d = util.listdir_recursive( + d = arvados.util.listdir_recursive( path, max_depth = (None if max_manifest_depth == 0 else 0)) if d: self._queue_dirents(stream_name, d) @@ -216,7 +221,11 @@ class CollectionWriter(CollectionBase): self.do_queued_work() def write(self, newdata): - if hasattr(newdata, '__iter__'): + if isinstance(newdata, bytes): + pass + elif isinstance(newdata, str): + newdata = newdata.encode() + elif hasattr(newdata, '__iter__'): for s in newdata: self.write(s) return @@ -256,7 +265,7 @@ class CollectionWriter(CollectionBase): return self._last_open def flush_data(self): - data_buffer = ''.join(self._data_buffer) + data_buffer = b''.join(self._data_buffer) if data_buffer: self._current_stream_locators.append( self._my_keep().put( @@ -346,11 +355,12 @@ class CollectionWriter(CollectionBase): sending manifest_text() to the API server's "create collection" endpoint. """ - return self._my_keep().put(self.manifest_text(), copies=self.replication) + return self._my_keep().put(self.manifest_text().encode(), + copies=self.replication) def portable_data_hash(self): - stripped = self.stripped_manifest() - return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) + stripped = self.stripped_manifest().encode() + return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) def manifest_text(self): self.finish_current_stream() @@ -418,7 +428,7 @@ class ResumableCollectionWriter(CollectionWriter): return writer def check_dependencies(self): - for path, orig_stat in self._dependencies.items(): + for path, orig_stat in listitems(self._dependencies): if not S_ISREG(orig_stat[ST_MODE]): raise errors.StaleWriterStateError("{} not file".format(path)) try: @@ -544,7 +554,7 @@ class RichCollectionBase(CollectionBase): else: item = ArvadosFile(self, pathcomponents[0]) self._items[pathcomponents[0]] = item - self._committed = False + self.set_committed(False) self.notify(ADD, self, pathcomponents[0], item) return item else: @@ -552,7 +562,7 @@ class RichCollectionBase(CollectionBase): # create new collection item = Subcollection(self, pathcomponents[0]) self._items[pathcomponents[0]] = item - self._committed = False + self.set_committed(False) self.notify(ADD, self, pathcomponents[0], item) if isinstance(item, RichCollectionBase): return item.find_or_create(pathcomponents[1], create_type) @@ -612,7 +622,12 @@ class RichCollectionBase(CollectionBase): :path: path to a file in the collection :mode: - one of "r", "r+", "w", "w+", "a", "a+" + a string consisting of "r", "w", or "a", optionally followed + by "b" or "t", optionally followed by "+". + :"b": + binary mode: write() accepts bytes, read() returns bytes. + :"t": + text mode (default): write() accepts strings, read() returns strings. :"r": opens for reading :"r+": @@ -624,33 +639,28 @@ class RichCollectionBase(CollectionBase): the end of the file. Writing does not affect the file pointer for reading. """ - mode = mode.replace("b", "") - if len(mode) == 0 or mode[0] not in ("r", "w", "a"): - raise errors.ArgumentError("Bad mode '%s'" % mode) - create = (mode != "r") - if create and not self.writable(): - raise IOError(errno.EROFS, "Collection is read only") + if not re.search(r'^[rwa][bt]?\+?$', mode): + raise errors.ArgumentError("Invalid mode {!r}".format(mode)) - if create: - arvfile = self.find_or_create(path, FILE) - else: + if mode[0] == 'r' and '+' not in mode: + fclass = ArvadosFileReader arvfile = self.find(path) + elif not self.writable(): + raise IOError(errno.EROFS, "Collection is read only") + else: + fclass = ArvadosFileWriter + arvfile = self.find_or_create(path, FILE) if arvfile is None: raise IOError(errno.ENOENT, "File not found", path) if not isinstance(arvfile, ArvadosFile): raise IOError(errno.EISDIR, "Is a directory", path) - if mode[0] == "w": + if mode[0] == 'w': arvfile.truncate(0) - name = os.path.basename(path) - - if mode == "r": - return ArvadosFileReader(arvfile, num_retries=self.num_retries) - else: - return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries) + return fclass(arvfile, mode=mode, num_retries=self.num_retries) def modified(self): """Determine if the collection has been modified since last commited.""" @@ -659,25 +669,31 @@ class RichCollectionBase(CollectionBase): @synchronized def committed(self): """Determine if the collection has been committed to the API server.""" - - if self._committed is False: - return False - for v in self._items.values(): - if v.committed() is False: - return False - return True + return self._committed @synchronized - def set_committed(self): - """Recursively set committed flag to True.""" - self._committed = True - for k,v in self._items.items(): - v.set_committed() + def set_committed(self, value=True): + """Recursively set committed flag. + + If value is True, set committed to be True for this and all children. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + if value: + for k,v in listitems(self._items): + v.set_committed(True) + self._committed = True + else: + self._committed = False + if self.parent is not None: + self.parent.set_committed(False) @synchronized def __iter__(self): """Iterate over names of files and collections contained in this collection.""" - return iter(self._items.keys()) + return iter(viewkeys(self._items)) @synchronized def __getitem__(self, k): @@ -703,7 +719,7 @@ class RichCollectionBase(CollectionBase): def __delitem__(self, p): """Delete an item by name which is directly contained by this collection.""" del self._items[p] - self._committed = False + self.set_committed(False) self.notify(DEL, self, p, None) @synchronized @@ -714,12 +730,12 @@ class RichCollectionBase(CollectionBase): @synchronized def values(self): """Get a list of files and collection objects directly contained in this collection.""" - return self._items.values() + return listvalues(self._items) @synchronized def items(self): """Get a list of (name, object) tuples directly contained in this collection.""" - return self._items.items() + return listitems(self._items) def exists(self, path): """Test if there is a file or collection at `path`.""" @@ -746,13 +762,13 @@ class RichCollectionBase(CollectionBase): raise IOError(errno.ENOTEMPTY, "Directory not empty", path) deleteditem = self._items[pathcomponents[0]] del self._items[pathcomponents[0]] - self._committed = False + self.set_committed(False) self.notify(DEL, self, pathcomponents[0], deleteditem) else: item.remove(pathcomponents[1]) def _clonefrom(self, source): - for k,v in source.items(): + for k,v in listitems(source): self._items[k] = v.clone(self, k) def clone(self): @@ -795,7 +811,7 @@ class RichCollectionBase(CollectionBase): item = source_obj.clone(self, target_name) self._items[target_name] = item - self._committed = False + self.set_committed(False) if modified_from: self.notify(MOD, self, target_name, (modified_from, item)) @@ -1022,7 +1038,7 @@ class RichCollectionBase(CollectionBase): """ if changes: - self._committed = False + self.set_committed(False) for change in changes: event_type = change[0] path = change[1] @@ -1068,8 +1084,8 @@ class RichCollectionBase(CollectionBase): # then return API server's PDH response. return self._portable_data_hash else: - stripped = self.portable_manifest_text() - return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) + stripped = self.portable_manifest_text().encode() + return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) @synchronized def subscribe(self, callback): @@ -1110,7 +1126,7 @@ class RichCollectionBase(CollectionBase): @synchronized def flush(self): """Flush bufferblocks to Keep.""" - for e in self.values(): + for e in listvalues(self): e.flush() @@ -1216,11 +1232,11 @@ class Collection(RichCollectionBase): self.events = None if manifest_locator_or_text: - if re.match(util.keep_locator_pattern, manifest_locator_or_text): + if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.collection_uuid_pattern, manifest_locator_or_text): + elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.manifest_pattern, manifest_locator_or_text): + elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text else: raise errors.ArgumentError( @@ -1310,6 +1326,7 @@ class Collection(RichCollectionBase): uuid=self._manifest_locator).execute( num_retries=self.num_retries)) self._manifest_text = self._api_response['manifest_text'] + self._portable_data_hash = self._api_response['portable_data_hash'] # If not overriden via kwargs, we should try to load the # replication_desired from the API server if self.replication_desired is None: @@ -1325,7 +1342,7 @@ class Collection(RichCollectionBase): # mode. Return an exception, or None if successful. try: self._manifest_text = self._my_keep().get( - self._manifest_locator, num_retries=self.num_retries) + self._manifest_locator, num_retries=self.num_retries).decode() except Exception as e: return e @@ -1335,10 +1352,10 @@ class Collection(RichCollectionBase): error_via_api = None error_via_keep = None should_try_keep = ((self._manifest_text is None) and - util.keep_locator_pattern.match( + arvados.util.keep_locator_pattern.match( self._manifest_locator)) if ((self._manifest_text is None) and - util.signed_locator_pattern.match(self._manifest_locator)): + arvados.util.signed_locator_pattern.match(self._manifest_locator)): error_via_keep = self._populate_from_keep() if self._manifest_text is None: error_via_api = self._populate_from_api_server() @@ -1364,7 +1381,7 @@ class Collection(RichCollectionBase): def _has_collection_uuid(self): - return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator) + return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) def __enter__(self): return self @@ -1476,7 +1493,7 @@ class Collection(RichCollectionBase): num_retries=num_retries)) self._manifest_text = self._api_response["manifest_text"] self._portable_data_hash = self._api_response["portable_data_hash"] - self.set_committed() + self.set_committed(True) return self._manifest_text @@ -1537,7 +1554,7 @@ class Collection(RichCollectionBase): self._portable_data_hash = self._api_response["portable_data_hash"] self._manifest_text = text - self.set_committed() + self.set_committed(True) return text @@ -1568,7 +1585,7 @@ class Collection(RichCollectionBase): stream_name = tok.replace('\\040', ' ') blocks = [] segments = [] - streamoffset = 0L + streamoffset = 0 state = BLOCKS self.find_or_create(stream_name, COLLECTION) continue @@ -1576,7 +1593,7 @@ class Collection(RichCollectionBase): if state == BLOCKS: block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) if block_locator: - blocksize = long(block_locator.group(1)) + blocksize = int(block_locator.group(1)) blocks.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize else: @@ -1585,8 +1602,8 @@ class Collection(RichCollectionBase): if state == SEGMENTS: file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok) if file_segment: - pos = long(file_segment.group(1)) - size = long(file_segment.group(2)) + pos = int(file_segment.group(1)) + size = int(file_segment.group(2)) name = file_segment.group(3).replace('\\040', ' ') filepath = os.path.join(stream_name, name) afile = self.find_or_create(filepath, FILE) @@ -1602,7 +1619,7 @@ class Collection(RichCollectionBase): stream_name = None state = STREAM_NAME - self.set_committed() + self.set_committed(True) @synchronized def notify(self, event, collection, name, item): @@ -1652,7 +1669,7 @@ class Subcollection(RichCollectionBase): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush() self.parent.remove(self.name, recursive=True) self.parent = newparent