X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b7f4b12d8722609fbd607a75d317dd60b93497b7..446e64dc143148e52a126fa502bf63299c94197e:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 098a11e1bb..7f78365fb5 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1,3 +1,12 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import absolute_import +from future.utils import listitems, listvalues, viewkeys +from builtins import str +from past.builtins import basestring +from builtins import object import functools import logging import os @@ -11,15 +20,15 @@ from collections import deque from stat import * from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock -from keep import KeepLocator, KeepClient +from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream from ._ranges import Range, LocatorAndRange from .safeapi import ThreadSafeApiCache -import config -import errors -import util -import events +import arvados.config as config +import arvados.errors as errors +import arvados.util +import arvados.events as events from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') @@ -51,7 +60,7 @@ class CollectionBase(object): if fields: clean_fields = fields[:1] + [ (re.sub(r'\+[^\d][^\+]*', '', x) - if re.match(util.keep_locator_pattern, x) + if re.match(arvados.util.keep_locator_pattern, x) else x) for x in fields[1:]] clean += [' '.join(clean_fields), "\n"] @@ -180,7 +189,7 @@ class CollectionWriter(CollectionBase): def _work_trees(self): path, stream_name, max_manifest_depth = self._queued_trees[0] - d = util.listdir_recursive( + d = arvados.util.listdir_recursive( path, max_depth = (None if max_manifest_depth == 0 else 0)) if d: self._queue_dirents(stream_name, d) @@ -216,7 +225,11 @@ class CollectionWriter(CollectionBase): self.do_queued_work() def write(self, newdata): - if hasattr(newdata, '__iter__'): + if isinstance(newdata, bytes): + pass + elif isinstance(newdata, str): + newdata = newdata.encode() + elif hasattr(newdata, '__iter__'): for s in newdata: self.write(s) return @@ -256,7 +269,7 @@ class CollectionWriter(CollectionBase): return self._last_open def flush_data(self): - data_buffer = ''.join(self._data_buffer) + data_buffer = b''.join(self._data_buffer) if data_buffer: self._current_stream_locators.append( self._my_keep().put( @@ -346,11 +359,12 @@ class CollectionWriter(CollectionBase): sending manifest_text() to the API server's "create collection" endpoint. """ - return self._my_keep().put(self.manifest_text(), copies=self.replication) + return self._my_keep().put(self.manifest_text().encode(), + copies=self.replication) def portable_data_hash(self): - stripped = self.stripped_manifest() - return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) + stripped = self.stripped_manifest().encode() + return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) def manifest_text(self): self.finish_current_stream() @@ -418,7 +432,7 @@ class ResumableCollectionWriter(CollectionWriter): return writer def check_dependencies(self): - for path, orig_stat in self._dependencies.items(): + for path, orig_stat in listitems(self._dependencies): if not S_ISREG(orig_stat[ST_MODE]): raise errors.StaleWriterStateError("{} not file".format(path)) try: @@ -612,7 +626,12 @@ class RichCollectionBase(CollectionBase): :path: path to a file in the collection :mode: - one of "r", "r+", "w", "w+", "a", "a+" + a string consisting of "r", "w", or "a", optionally followed + by "b" or "t", optionally followed by "+". + :"b": + binary mode: write() accepts bytes, read() returns bytes. + :"t": + text mode (default): write() accepts strings, read() returns strings. :"r": opens for reading :"r+": @@ -624,33 +643,28 @@ class RichCollectionBase(CollectionBase): the end of the file. Writing does not affect the file pointer for reading. """ - mode = mode.replace("b", "") - if len(mode) == 0 or mode[0] not in ("r", "w", "a"): - raise errors.ArgumentError("Bad mode '%s'" % mode) - create = (mode != "r") - if create and not self.writable(): - raise IOError(errno.EROFS, "Collection is read only") + if not re.search(r'^[rwa][bt]?\+?$', mode): + raise errors.ArgumentError("Invalid mode {!r}".format(mode)) - if create: - arvfile = self.find_or_create(path, FILE) - else: + if mode[0] == 'r' and '+' not in mode: + fclass = ArvadosFileReader arvfile = self.find(path) + elif not self.writable(): + raise IOError(errno.EROFS, "Collection is read only") + else: + fclass = ArvadosFileWriter + arvfile = self.find_or_create(path, FILE) if arvfile is None: raise IOError(errno.ENOENT, "File not found", path) if not isinstance(arvfile, ArvadosFile): raise IOError(errno.EISDIR, "Is a directory", path) - if mode[0] == "w": + if mode[0] == 'w': arvfile.truncate(0) - name = os.path.basename(path) - - if mode == "r": - return ArvadosFileReader(arvfile, num_retries=self.num_retries) - else: - return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries) + return fclass(arvfile, mode=mode, num_retries=self.num_retries) def modified(self): """Determine if the collection has been modified since last commited.""" @@ -672,7 +686,7 @@ class RichCollectionBase(CollectionBase): if value == self._committed: return if value: - for k,v in self._items.items(): + for k,v in listitems(self._items): v.set_committed(True) self._committed = True else: @@ -683,7 +697,7 @@ class RichCollectionBase(CollectionBase): @synchronized def __iter__(self): """Iterate over names of files and collections contained in this collection.""" - return iter(self._items.keys()) + return iter(viewkeys(self._items)) @synchronized def __getitem__(self, k): @@ -720,12 +734,12 @@ class RichCollectionBase(CollectionBase): @synchronized def values(self): """Get a list of files and collection objects directly contained in this collection.""" - return self._items.values() + return listvalues(self._items) @synchronized def items(self): """Get a list of (name, object) tuples directly contained in this collection.""" - return self._items.items() + return listitems(self._items) def exists(self, path): """Test if there is a file or collection at `path`.""" @@ -758,7 +772,7 @@ class RichCollectionBase(CollectionBase): item.remove(pathcomponents[1]) def _clonefrom(self, source): - for k,v in source.items(): + for k,v in listitems(source): self._items[k] = v.clone(self, k) def clone(self): @@ -1074,8 +1088,8 @@ class RichCollectionBase(CollectionBase): # then return API server's PDH response. return self._portable_data_hash else: - stripped = self.portable_manifest_text() - return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) + stripped = self.portable_manifest_text().encode() + return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) @synchronized def subscribe(self, callback): @@ -1116,7 +1130,7 @@ class RichCollectionBase(CollectionBase): @synchronized def flush(self): """Flush bufferblocks to Keep.""" - for e in self.values(): + for e in listvalues(self): e.flush() @@ -1222,11 +1236,11 @@ class Collection(RichCollectionBase): self.events = None if manifest_locator_or_text: - if re.match(util.keep_locator_pattern, manifest_locator_or_text): + if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.collection_uuid_pattern, manifest_locator_or_text): + elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.manifest_pattern, manifest_locator_or_text): + elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text else: raise errors.ArgumentError( @@ -1316,6 +1330,7 @@ class Collection(RichCollectionBase): uuid=self._manifest_locator).execute( num_retries=self.num_retries)) self._manifest_text = self._api_response['manifest_text'] + self._portable_data_hash = self._api_response['portable_data_hash'] # If not overriden via kwargs, we should try to load the # replication_desired from the API server if self.replication_desired is None: @@ -1331,7 +1346,7 @@ class Collection(RichCollectionBase): # mode. Return an exception, or None if successful. try: self._manifest_text = self._my_keep().get( - self._manifest_locator, num_retries=self.num_retries) + self._manifest_locator, num_retries=self.num_retries).decode() except Exception as e: return e @@ -1341,10 +1356,10 @@ class Collection(RichCollectionBase): error_via_api = None error_via_keep = None should_try_keep = ((self._manifest_text is None) and - util.keep_locator_pattern.match( + arvados.util.keep_locator_pattern.match( self._manifest_locator)) if ((self._manifest_text is None) and - util.signed_locator_pattern.match(self._manifest_locator)): + arvados.util.signed_locator_pattern.match(self._manifest_locator)): error_via_keep = self._populate_from_keep() if self._manifest_text is None: error_via_api = self._populate_from_api_server() @@ -1370,7 +1385,7 @@ class Collection(RichCollectionBase): def _has_collection_uuid(self): - return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator) + return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) def __enter__(self): return self @@ -1574,7 +1589,7 @@ class Collection(RichCollectionBase): stream_name = tok.replace('\\040', ' ') blocks = [] segments = [] - streamoffset = 0L + streamoffset = 0 state = BLOCKS self.find_or_create(stream_name, COLLECTION) continue @@ -1582,7 +1597,7 @@ class Collection(RichCollectionBase): if state == BLOCKS: block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) if block_locator: - blocksize = long(block_locator.group(1)) + blocksize = int(block_locator.group(1)) blocks.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize else: @@ -1591,8 +1606,8 @@ class Collection(RichCollectionBase): if state == SEGMENTS: file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok) if file_segment: - pos = long(file_segment.group(1)) - size = long(file_segment.group(2)) + pos = int(file_segment.group(1)) + size = int(file_segment.group(2)) name = file_segment.group(3).replace('\\040', ' ') filepath = os.path.join(stream_name, name) afile = self.find_or_create(filepath, FILE)