X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/068bc457a3ebe395d3920bb1e63369fcdd9268f7..d6cccb3ea4e5f076a436d9935e3835d4b620b859:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 6b33ea3030..627f0346db 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys from builtins import str from past.builtins import basestring from builtins import object +import ciso8601 +import datetime +import errno import functools +import hashlib +import io import logging import os import re -import errno -import hashlib -import datetime -import ciso8601 -import time +import sys import threading +import time from collections import deque from stat import * -from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock +from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream @@ -35,6 +37,21 @@ from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') + +if sys.version_info >= (3, 0): + TextIOWrapper = io.TextIOWrapper +else: + class TextIOWrapper(io.TextIOWrapper): + """To maintain backward compatibility, cast str to unicode in + write('foo'). + + """ + def write(self, data): + if isinstance(data, basestring): + data = unicode(data) + return super(TextIOWrapper, self).write(data) + + class CollectionBase(object): """Abstract base class for Collection classes.""" @@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase): def __init__(self, parent=None): self.parent = parent self._committed = False + self._has_remote_blocks = False self._callback = None self._items = {} @@ -544,6 +562,23 @@ class RichCollectionBase(CollectionBase): def stream_name(self): raise NotImplementedError() + @synchronized + def has_remote_blocks(self): + """Recursively check for a +R segment locator signature.""" + + if self._has_remote_blocks: + return True + for item in self: + if self[item].has_remote_blocks(): + return True + return False + + @synchronized + def set_has_remote_blocks(self, val): + self._has_remote_blocks = val + if self.parent: + self.parent.set_has_remote_blocks(val) + @must_be_writable @synchronized def find_or_create(self, path, create_type): @@ -636,7 +671,7 @@ class RichCollectionBase(CollectionBase): return self.find_or_create(path, COLLECTION) - def open(self, path, mode="r"): + def open(self, path, mode="r", encoding=None): """Open a file-like object for access. :path: @@ -658,6 +693,7 @@ class RichCollectionBase(CollectionBase): opens for reading and writing. All writes are appended to the end of the file. Writing does not affect the file pointer for reading. + """ if not re.search(r'^[rwa][bt]?\+?$', mode): @@ -680,7 +716,12 @@ class RichCollectionBase(CollectionBase): if mode[0] == 'w': arvfile.truncate(0) - return fclass(arvfile, mode=mode, num_retries=self.num_retries) + binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) + f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) + if 'b' not in mode: + bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader + f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) + return f def modified(self): """Determine if the collection has been modified since last commited.""" @@ -832,6 +873,8 @@ class RichCollectionBase(CollectionBase): self._items[target_name] = item self.set_committed(False) + if not self._has_remote_blocks and source_obj.has_remote_blocks(): + self.set_has_remote_blocks(True) if modified_from: self.notify(MOD, self, target_name, (modified_from, item)) @@ -1037,18 +1080,8 @@ class RichCollectionBase(CollectionBase): different subdirectories. """ - for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]: - for s in self[filename].segments(): - if '+R' in s.locator: - try: - loc = remote_blocks[s.locator] - except KeyError: - loc = self._my_keep().refresh_signature(s.locator) - remote_blocks[s.locator] = loc - s.locator = loc - self.set_committed(False) - for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]: - remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks) + for item in self: + remote_blocks = self[item]._copy_remote_blocks(remote_blocks) return remote_blocks @synchronized @@ -1285,8 +1318,12 @@ class Collection(RichCollectionBase): self._manifest_locator = manifest_locator_or_text elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text + if not self._has_local_collection_uuid(): + self._has_remote_blocks = True elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text + if '+R' in self._manifest_text: + self._has_remote_blocks = True else: raise errors.ArgumentError( "Argument to CollectionReader is not a manifest or a collection UUID") @@ -1536,10 +1573,11 @@ class Collection(RichCollectionBase): t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") body["trash_at"] = t - # Copy any remote blocks to the local cluster. - self._copy_remote_blocks(remote_blocks={}) - if not self.committed(): + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False if not self._has_collection_uuid(): raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") elif not self._has_local_collection_uuid(): @@ -1628,8 +1666,10 @@ class Collection(RichCollectionBase): if trash_at and type(trash_at) is not datetime.datetime: raise errors.ArgumentError("trash_at must be datetime type.") - # Copy any remote blocks to the local cluster. - self._copy_remote_blocks(remote_blocks={}) + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False self._my_block_manager().commit_all() text = self.manifest_text(strip=False)