X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/333a30289f47c22956bcb6a045d1cd4c797e22c3..d6cccb3ea4e5f076a436d9935e3835d4b620b859:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index e38a6bd475..627f0346db 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys from builtins import str from past.builtins import basestring from builtins import object +import ciso8601 +import datetime +import errno import functools +import hashlib +import io import logging import os import re -import errno -import hashlib -import datetime -import ciso8601 -import time +import sys import threading +import time from collections import deque from stat import * -from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock +from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream @@ -35,6 +37,21 @@ from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') + +if sys.version_info >= (3, 0): + TextIOWrapper = io.TextIOWrapper +else: + class TextIOWrapper(io.TextIOWrapper): + """To maintain backward compatibility, cast str to unicode in + write('foo'). + + """ + def write(self, data): + if isinstance(data, basestring): + data = unicode(data) + return super(TextIOWrapper, self).write(data) + + class CollectionBase(object): """Abstract base class for Collection classes.""" @@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase): def __init__(self, parent=None): self.parent = parent self._committed = False + self._has_remote_blocks = False self._callback = None self._items = {} @@ -544,6 +562,23 @@ class RichCollectionBase(CollectionBase): def stream_name(self): raise NotImplementedError() + @synchronized + def has_remote_blocks(self): + """Recursively check for a +R segment locator signature.""" + + if self._has_remote_blocks: + return True + for item in self: + if self[item].has_remote_blocks(): + return True + return False + + @synchronized + def set_has_remote_blocks(self, val): + self._has_remote_blocks = val + if self.parent: + self.parent.set_has_remote_blocks(val) + @must_be_writable @synchronized def find_or_create(self, path, create_type): @@ -636,7 +671,7 @@ class RichCollectionBase(CollectionBase): return self.find_or_create(path, COLLECTION) - def open(self, path, mode="r"): + def open(self, path, mode="r", encoding=None): """Open a file-like object for access. :path: @@ -658,6 +693,7 @@ class RichCollectionBase(CollectionBase): opens for reading and writing. All writes are appended to the end of the file. Writing does not affect the file pointer for reading. + """ if not re.search(r'^[rwa][bt]?\+?$', mode): @@ -680,7 +716,12 @@ class RichCollectionBase(CollectionBase): if mode[0] == 'w': arvfile.truncate(0) - return fclass(arvfile, mode=mode, num_retries=self.num_retries) + binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) + f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) + if 'b' not in mode: + bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader + f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) + return f def modified(self): """Determine if the collection has been modified since last commited.""" @@ -832,6 +873,8 @@ class RichCollectionBase(CollectionBase): self._items[target_name] = item self.set_committed(False) + if not self._has_remote_blocks and source_obj.has_remote_blocks(): + self.set_has_remote_blocks(True) if modified_from: self.notify(MOD, self, target_name, (modified_from, item)) @@ -1023,6 +1066,24 @@ class RichCollectionBase(CollectionBase): else: return self._manifest_text + @synchronized + def _copy_remote_blocks(self, remote_blocks={}): + """Scan through the entire collection and ask Keep to copy remote blocks. + + When accessing a remote collection, blocks will have a remote signature + (+R instead of +A). Collect these signatures and request Keep to copy the + blocks to the local cluster, returning local (+A) signatures. + + :remote_blocks: + Shared cache of remote to local block mappings. This is used to avoid + doing extra work when blocks are shared by more than one file in + different subdirectories. + + """ + for item in self: + remote_blocks = self[item]._copy_remote_blocks(remote_blocks) + return remote_blocks + @synchronized def diff(self, end_collection, prefix=".", holding_collection=None): """Generate list of add/modify/delete actions. @@ -1257,8 +1318,12 @@ class Collection(RichCollectionBase): self._manifest_locator = manifest_locator_or_text elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text + if not self._has_local_collection_uuid(): + self._has_remote_blocks = True elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text + if '+R' in self._manifest_text: + self._has_remote_blocks = True else: raise errors.ArgumentError( "Argument to CollectionReader is not a manifest or a collection UUID") @@ -1376,6 +1441,10 @@ class Collection(RichCollectionBase): def _has_collection_uuid(self): return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) + def _has_local_collection_uuid(self): + return self._has_collection_uuid and \ + self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] + def __enter__(self): return self @@ -1505,8 +1574,14 @@ class Collection(RichCollectionBase): body["trash_at"] = t if not self.committed(): + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False if not self._has_collection_uuid(): raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") + elif not self._has_local_collection_uuid(): + raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") self._my_block_manager().commit_all() @@ -1591,6 +1666,11 @@ class Collection(RichCollectionBase): if trash_at and type(trash_at) is not datetime.datetime: raise errors.ArgumentError("trash_at must be datetime type.") + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False + self._my_block_manager().commit_all() text = self.manifest_text(strip=False) @@ -1627,6 +1707,9 @@ class Collection(RichCollectionBase): _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') _segment_re = re.compile(r'(\d+):(\d+):(\S+)') + def _unescape_manifest_path(self, path): + return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) + @synchronized def _import_manifest(self, manifest_text): """Import a manifest into a `Collection`. @@ -1651,7 +1734,7 @@ class Collection(RichCollectionBase): if state == STREAM_NAME: # starting a new stream - stream_name = tok.replace('\\040', ' ') + stream_name = self._unescape_manifest_path(tok) blocks = [] segments = [] streamoffset = 0 @@ -1673,13 +1756,18 @@ class Collection(RichCollectionBase): if file_segment: pos = int(file_segment.group(1)) size = int(file_segment.group(2)) - name = file_segment.group(3).replace('\\040', ' ') - filepath = os.path.join(stream_name, name) - afile = self.find_or_create(filepath, FILE) - if isinstance(afile, ArvadosFile): - afile.add_segment(blocks, pos, size) + name = self._unescape_manifest_path(file_segment.group(3)) + if name.split('/')[-1] == '.': + # placeholder for persisting an empty directory, not a real file + if len(name) > 2: + self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) else: - raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) + filepath = os.path.join(stream_name, name) + afile = self.find_or_create(filepath, FILE) + if isinstance(afile, ArvadosFile): + afile.add_segment(blocks, pos, size) + else: + raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) else: # error! raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)