X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a830b5b560251c3143a7b1fd60db3f50a7021b34..0e0c1400b57d5de8aa8c18dd4897527f905a4b42:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 38e794c24a..0d88084340 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1,3 +1,7 @@ +from __future__ import absolute_import +from builtins import str +from past.builtins import basestring +from builtins import object import functools import logging import os @@ -11,15 +15,15 @@ from collections import deque from stat import * from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock -from keep import KeepLocator, KeepClient +from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream from ._ranges import Range, LocatorAndRange from .safeapi import ThreadSafeApiCache -import config -import errors -import util -import events +import arvados.config as config +import arvados.errors as errors +import arvados.util +import arvados.events as events from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') @@ -51,7 +55,7 @@ class CollectionBase(object): if fields: clean_fields = fields[:1] + [ (re.sub(r'\+[^\d][^\+]*', '', x) - if re.match(util.keep_locator_pattern, x) + if re.match(arvados.util.keep_locator_pattern, x) else x) for x in fields[1:]] clean += [' '.join(clean_fields), "\n"] @@ -180,7 +184,7 @@ class CollectionWriter(CollectionBase): def _work_trees(self): path, stream_name, max_manifest_depth = self._queued_trees[0] - d = util.listdir_recursive( + d = arvados.util.listdir_recursive( path, max_depth = (None if max_manifest_depth == 0 else 0)) if d: self._queue_dirents(stream_name, d) @@ -307,7 +311,8 @@ class CollectionWriter(CollectionBase): def set_current_stream_name(self, newstreamname): if re.search(r'[\t\n]', newstreamname): raise errors.AssertionError( - "Manifest stream names cannot contain whitespace") + "Manifest stream names cannot contain whitespace: '%s'" % + (newstreamname)) self._current_stream_name = '.' if newstreamname=='' else newstreamname def current_stream_name(self): @@ -417,7 +422,7 @@ class ResumableCollectionWriter(CollectionWriter): return writer def check_dependencies(self): - for path, orig_stat in self._dependencies.items(): + for path, orig_stat in list(self._dependencies.items()): if not S_ISREG(orig_stat[ST_MODE]): raise errors.StaleWriterStateError("{} not file".format(path)) try: @@ -474,6 +479,7 @@ class ResumableCollectionWriter(CollectionWriter): ADD = "add" DEL = "del" MOD = "mod" +TOK = "tok" FILE = "file" COLLECTION = "collection" @@ -542,7 +548,7 @@ class RichCollectionBase(CollectionBase): else: item = ArvadosFile(self, pathcomponents[0]) self._items[pathcomponents[0]] = item - self._committed = False + self.set_committed(False) self.notify(ADD, self, pathcomponents[0], item) return item else: @@ -550,12 +556,12 @@ class RichCollectionBase(CollectionBase): # create new collection item = Subcollection(self, pathcomponents[0]) self._items[pathcomponents[0]] = item - self._committed = False + self.set_committed(False) self.notify(ADD, self, pathcomponents[0], item) if isinstance(item, RichCollectionBase): return item.find_or_create(pathcomponents[1], create_type) else: - raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0]) + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) else: return self @@ -563,16 +569,23 @@ class RichCollectionBase(CollectionBase): def find(self, path): """Recursively search the specified file path. - May return either a Collection or ArvadosFile. Return None if not + May return either a Collection or ArvadosFile. Return None if not found. + If path is invalid (ex: starts with '/'), an IOError exception will be + raised. """ if not path: raise errors.ArgumentError("Parameter 'path' is empty.") pathcomponents = path.split("/", 1) + if pathcomponents[0] == '': + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) + item = self._items.get(pathcomponents[0]) - if len(pathcomponents) == 1: + if item is None: + return None + elif len(pathcomponents) == 1: return item else: if isinstance(item, RichCollectionBase): @@ -581,7 +594,7 @@ class RichCollectionBase(CollectionBase): else: return item else: - raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0]) + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) @synchronized def mkdirs(self, path): @@ -593,7 +606,7 @@ class RichCollectionBase(CollectionBase): """ if self.find(path) != None: - raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path) + raise IOError(errno.EEXIST, "Directory or file exists", path) return self.find_or_create(path, COLLECTION) @@ -629,9 +642,9 @@ class RichCollectionBase(CollectionBase): arvfile = self.find(path) if arvfile is None: - raise IOError(errno.ENOENT, "File not found") + raise IOError(errno.ENOENT, "File not found", path) if not isinstance(arvfile, ArvadosFile): - raise IOError(errno.EISDIR, "Is a directory: %s" % path) + raise IOError(errno.EISDIR, "Is a directory", path) if mode[0] == "w": arvfile.truncate(0) @@ -650,25 +663,31 @@ class RichCollectionBase(CollectionBase): @synchronized def committed(self): """Determine if the collection has been committed to the API server.""" - - if self._committed is False: - return False - for v in self._items.values(): - if v.committed() is False: - return False - return True + return self._committed @synchronized - def set_committed(self): - """Recursively set committed flag to True.""" - self._committed = True - for k,v in self._items.items(): - v.set_committed() + def set_committed(self, value=True): + """Recursively set committed flag. + + If value is True, set committed to be True for this and all children. + + If value is False, set committed to be False for this and all parents. + """ + if value == self._committed: + return + if value: + for k,v in list(self._items.items()): + v.set_committed(True) + self._committed = True + else: + self._committed = False + if self.parent is not None: + self.parent.set_committed(False) @synchronized def __iter__(self): """Iterate over names of files and collections contained in this collection.""" - return iter(self._items.keys()) + return iter(list(self._items.keys())) @synchronized def __getitem__(self, k): @@ -694,23 +713,23 @@ class RichCollectionBase(CollectionBase): def __delitem__(self, p): """Delete an item by name which is directly contained by this collection.""" del self._items[p] - self._committed = False + self.set_committed(False) self.notify(DEL, self, p, None) @synchronized def keys(self): """Get a list of names of files and collections directly contained in this collection.""" - return self._items.keys() + return list(self._items.keys()) @synchronized def values(self): """Get a list of files and collection objects directly contained in this collection.""" - return self._items.values() + return list(self._items.values()) @synchronized def items(self): """Get a list of (name, object) tuples directly contained in this collection.""" - return self._items.items() + return list(self._items.items()) def exists(self, path): """Test if there is a file or collection at `path`.""" @@ -731,19 +750,19 @@ class RichCollectionBase(CollectionBase): pathcomponents = path.split("/", 1) item = self._items.get(pathcomponents[0]) if item is None: - raise IOError(errno.ENOENT, "File not found") + raise IOError(errno.ENOENT, "File not found", path) if len(pathcomponents) == 1: if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: - raise IOError(errno.ENOTEMPTY, "Subcollection not empty") + raise IOError(errno.ENOTEMPTY, "Directory not empty", path) deleteditem = self._items[pathcomponents[0]] del self._items[pathcomponents[0]] - self._committed = False + self.set_committed(False) self.notify(DEL, self, pathcomponents[0], deleteditem) else: item.remove(pathcomponents[1]) def _clonefrom(self, source): - for k,v in source.items(): + for k,v in list(source.items()): self._items[k] = v.clone(self, k) def clone(self): @@ -772,7 +791,7 @@ class RichCollectionBase(CollectionBase): """ if target_name in self and not overwrite: - raise IOError(errno.EEXIST, "File already exists") + raise IOError(errno.EEXIST, "File already exists", target_name) modified_from = None if target_name in self: @@ -786,7 +805,7 @@ class RichCollectionBase(CollectionBase): item = source_obj.clone(self, target_name) self._items[target_name] = item - self._committed = False + self.set_committed(False) if modified_from: self.notify(MOD, self, target_name, (modified_from, item)) @@ -801,7 +820,7 @@ class RichCollectionBase(CollectionBase): if isinstance(source, basestring): source_obj = source_collection.find(source) if source_obj is None: - raise IOError(errno.ENOENT, "File not found") + raise IOError(errno.ENOENT, "File not found", source) sourcecomponents = source.split("/") else: source_obj = source @@ -825,9 +844,9 @@ class RichCollectionBase(CollectionBase): target_dir = self if target_dir is None: - raise IOError(errno.ENOENT, "Target directory not found.") + raise IOError(errno.ENOENT, "Target directory not found", target_name) - if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents: + if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: target_dir = target_dir[target_name] target_name = sourcecomponents[-1] @@ -880,7 +899,7 @@ class RichCollectionBase(CollectionBase): source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) if not source_obj.writable(): - raise IOError(errno.EROFS, "Source collection is read only.") + raise IOError(errno.EROFS, "Source collection is read only", source) target_dir.add(source_obj, target_name, overwrite, True) def portable_manifest_text(self, stream_name="."): @@ -896,7 +915,8 @@ class RichCollectionBase(CollectionBase): return self._get_manifest_text(stream_name, True, True) @synchronized - def manifest_text(self, stream_name=".", strip=False, normalize=False): + def manifest_text(self, stream_name=".", strip=False, normalize=False, + only_committed=False): """Get the manifest text for this collection, sub collections and files. This method will flush outstanding blocks to Keep. By default, it will @@ -915,13 +935,18 @@ class RichCollectionBase(CollectionBase): is not modified, return the original manifest text even if it is not in normalized form. + :only_committed: + If True, don't commit pending blocks. + """ - self._my_block_manager().commit_all() - return self._get_manifest_text(stream_name, strip, normalize) + if not only_committed: + self._my_block_manager().commit_all() + return self._get_manifest_text(stream_name, strip, normalize, + only_committed=only_committed) @synchronized - def _get_manifest_text(self, stream_name, strip, normalize): + def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): """Get the manifest text for this collection, sub collections and files. :stream_name: @@ -937,6 +962,9 @@ class RichCollectionBase(CollectionBase): is not modified, return the original manifest text even if it is not in normalized form. + :only_committed: + If True, only include blocks that were already committed to Keep. + """ if not self.committed() or self._manifest_text is None or normalize: @@ -950,6 +978,8 @@ class RichCollectionBase(CollectionBase): for segment in arvfile.segments(): loc = segment.locator if arvfile.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() if strip: loc = KeepLocator(loc).stripped() @@ -959,7 +989,7 @@ class RichCollectionBase(CollectionBase): if stream: buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: - buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True)) + buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed)) return "".join(buf) else: if strip: @@ -986,6 +1016,8 @@ class RichCollectionBase(CollectionBase): changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) elif end_collection[k] != self[k]: changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) + else: + changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) else: changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) return changes @@ -1000,7 +1032,7 @@ class RichCollectionBase(CollectionBase): """ if changes: - self._committed = False + self.set_committed(False) for change in changes: event_type = change[0] path = change[1] @@ -1016,7 +1048,7 @@ class RichCollectionBase(CollectionBase): # There is already local file and it is different: # save change to conflict file. self.copy(initial, conflictpath) - elif event_type == MOD: + elif event_type == MOD or event_type == TOK: final = change[3] if local == initial: # Local matches the "initial" item so it has not @@ -1041,8 +1073,13 @@ class RichCollectionBase(CollectionBase): def portable_data_hash(self): """Get the portable data hash for this collection's manifest.""" - stripped = self.portable_manifest_text() - return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) + if self._manifest_locator and self.committed(): + # If the collection is already saved on the API server, and it's committed + # then return API server's PDH response. + return self._portable_data_hash + else: + stripped = self.portable_manifest_text() + return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) @synchronized def subscribe(self, callback): @@ -1083,7 +1120,7 @@ class RichCollectionBase(CollectionBase): @synchronized def flush(self): """Flush bufferblocks to Keep.""" - for e in self.values(): + for e in list(self.values()): e.flush() @@ -1132,7 +1169,9 @@ class Collection(RichCollectionBase): num_retries=None, parent=None, apiconfig=None, - block_manager=None): + block_manager=None, + replication_desired=None, + put_threads=None): """Collection constructor. :manifest_locator_or_text: @@ -1140,24 +1179,36 @@ class Collection(RichCollectionBase): a manifest, raw manifest text, or None (to create an empty collection). :parent: the parent Collection, may be None. + :apiconfig: A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN. Prefer this over supplying your own api_client and keep_client (except in testing). Will use default config settings if not specified. + :api_client: The API client object to use for requests. If not specified, create one using `apiconfig`. + :keep_client: the Keep client to use for requests. If not specified, create one using `apiconfig`. + :num_retries: the number of retries for API and Keep requests. + :block_manager: the block manager to use. If not specified, create one. + :replication_desired: + How many copies should Arvados maintain. If None, API server default + configuration applies. If not None, this value will also be used + for determining the number of block copies being written. + """ super(Collection, self).__init__(parent) self._api_client = api_client self._keep_client = keep_client self._block_manager = block_manager + self.replication_desired = replication_desired + self.put_threads = put_threads if apiconfig: self._config = apiconfig @@ -1167,6 +1218,7 @@ class Collection(RichCollectionBase): self.num_retries = num_retries if num_retries is not None else 0 self._manifest_locator = None self._manifest_text = None + self._portable_data_hash = None self._api_response = None self._past_versions = set() @@ -1174,11 +1226,11 @@ class Collection(RichCollectionBase): self.events = None if manifest_locator_or_text: - if re.match(util.keep_locator_pattern, manifest_locator_or_text): + if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.collection_uuid_pattern, manifest_locator_or_text): + elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text - elif re.match(util.manifest_pattern, manifest_locator_or_text): + elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text else: raise errors.ArgumentError( @@ -1211,8 +1263,12 @@ class Collection(RichCollectionBase): if self._manifest_locator is None: raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) - if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))): - # We've merged this record this before. Don't do anything. + if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and + response.get("portable_data_hash") != self.portable_data_hash()): + # The record on the server is different from our current one, but we've seen it before, + # so ignore it because it's already been merged. + # However, if it's the same as our current record, proceed with the update, because we want to update + # our tokens. return else: self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) @@ -1225,7 +1281,8 @@ class Collection(RichCollectionBase): def _my_api(self): if self._api_client is None: self._api_client = ThreadSafeApiCache(self._config) - self._keep_client = self._api_client.keep + if self._keep_client is None: + self._keep_client = self._api_client.keep return self._api_client @synchronized @@ -1240,7 +1297,10 @@ class Collection(RichCollectionBase): @synchronized def _my_block_manager(self): if self._block_manager is None: - self._block_manager = _BlockManager(self._my_keep()) + copies = (self.replication_desired or + self._my_api()._rootDesc.get('defaultCollectionReplication', + 2)) + self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads) return self._block_manager def _remember_api_response(self, response): @@ -1260,6 +1320,11 @@ class Collection(RichCollectionBase): uuid=self._manifest_locator).execute( num_retries=self.num_retries)) self._manifest_text = self._api_response['manifest_text'] + self._portable_data_hash = self._api_response['portable_data_hash'] + # If not overriden via kwargs, we should try to load the + # replication_desired from the API server + if self.replication_desired is None: + self.replication_desired = self._api_response.get('replication_desired', None) return None except Exception as e: return e @@ -1281,10 +1346,10 @@ class Collection(RichCollectionBase): error_via_api = None error_via_keep = None should_try_keep = ((self._manifest_text is None) and - util.keep_locator_pattern.match( + arvados.util.keep_locator_pattern.match( self._manifest_locator)) if ((self._manifest_text is None) and - util.signed_locator_pattern.match(self._manifest_locator)): + arvados.util.signed_locator_pattern.match(self._manifest_locator)): error_via_keep = self._populate_from_keep() if self._manifest_text is None: error_via_api = self._populate_from_api_server() @@ -1310,7 +1375,7 @@ class Collection(RichCollectionBase): def _has_collection_uuid(self): - return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator) + return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) def __enter__(self): return self @@ -1421,7 +1486,8 @@ class Collection(RichCollectionBase): ).execute( num_retries=num_retries)) self._manifest_text = self._api_response["manifest_text"] - self.set_committed() + self._portable_data_hash = self._api_response["portable_data_hash"] + self.set_committed(True) return self._manifest_text @@ -1470,7 +1536,8 @@ class Collection(RichCollectionBase): ensure_unique_name = True body = {"manifest_text": text, - "name": name} + "name": name, + "replication_desired": self.replication_desired} if owner_uuid: body["owner_uuid"] = owner_uuid @@ -1478,9 +1545,10 @@ class Collection(RichCollectionBase): text = self._api_response["manifest_text"] self._manifest_locator = self._api_response["uuid"] + self._portable_data_hash = self._api_response["portable_data_hash"] self._manifest_text = text - self.set_committed() + self.set_committed(True) return text @@ -1511,7 +1579,7 @@ class Collection(RichCollectionBase): stream_name = tok.replace('\\040', ' ') blocks = [] segments = [] - streamoffset = 0L + streamoffset = 0 state = BLOCKS self.find_or_create(stream_name, COLLECTION) continue @@ -1519,7 +1587,7 @@ class Collection(RichCollectionBase): if state == BLOCKS: block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) if block_locator: - blocksize = long(block_locator.group(1)) + blocksize = int(block_locator.group(1)) blocks.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize else: @@ -1528,8 +1596,8 @@ class Collection(RichCollectionBase): if state == SEGMENTS: file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok) if file_segment: - pos = long(file_segment.group(1)) - size = long(file_segment.group(2)) + pos = int(file_segment.group(1)) + size = int(file_segment.group(2)) name = file_segment.group(3).replace('\\040', ' ') filepath = os.path.join(stream_name, name) afile = self.find_or_create(filepath, FILE) @@ -1545,7 +1613,7 @@ class Collection(RichCollectionBase): stream_name = None state = STREAM_NAME - self.set_committed() + self.set_committed(True) @synchronized def notify(self, event, collection, name, item): @@ -1595,7 +1663,7 @@ class Subcollection(RichCollectionBase): @must_be_writable @synchronized def _reparent(self, newparent, newname): - self._committed = False + self.set_committed(False) self.flush() self.parent.remove(self.name, recursive=True) self.parent = newparent