X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b8addea7d6c8c464a2743191561470332eeaba13..d6cccb3ea4e5f076a436d9935e3835d4b620b859:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 13301b2972..627f0346db 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys from builtins import str from past.builtins import basestring from builtins import object +import ciso8601 +import datetime +import errno import functools +import hashlib +import io import logging import os import re -import errno -import hashlib -import datetime -import ciso8601 -import time +import sys import threading +import time from collections import deque from stat import * -from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock +from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock from .keep import KeepLocator, KeepClient from .stream import StreamReader from ._normalize_stream import normalize_stream @@ -35,6 +37,21 @@ from arvados.retry import retry_method _logger = logging.getLogger('arvados.collection') + +if sys.version_info >= (3, 0): + TextIOWrapper = io.TextIOWrapper +else: + class TextIOWrapper(io.TextIOWrapper): + """To maintain backward compatibility, cast str to unicode in + write('foo'). + + """ + def write(self, data): + if isinstance(data, basestring): + data = unicode(data) + return super(TextIOWrapper, self).write(data) + + class CollectionBase(object): """Abstract base class for Collection classes.""" @@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase): def __init__(self, parent=None): self.parent = parent self._committed = False + self._has_remote_blocks = False self._callback = None self._items = {} @@ -544,6 +562,23 @@ class RichCollectionBase(CollectionBase): def stream_name(self): raise NotImplementedError() + @synchronized + def has_remote_blocks(self): + """Recursively check for a +R segment locator signature.""" + + if self._has_remote_blocks: + return True + for item in self: + if self[item].has_remote_blocks(): + return True + return False + + @synchronized + def set_has_remote_blocks(self, val): + self._has_remote_blocks = val + if self.parent: + self.parent.set_has_remote_blocks(val) + @must_be_writable @synchronized def find_or_create(self, path, create_type): @@ -636,7 +671,7 @@ class RichCollectionBase(CollectionBase): return self.find_or_create(path, COLLECTION) - def open(self, path, mode="r"): + def open(self, path, mode="r", encoding=None): """Open a file-like object for access. :path: @@ -658,6 +693,7 @@ class RichCollectionBase(CollectionBase): opens for reading and writing. All writes are appended to the end of the file. Writing does not affect the file pointer for reading. + """ if not re.search(r'^[rwa][bt]?\+?$', mode): @@ -680,7 +716,12 @@ class RichCollectionBase(CollectionBase): if mode[0] == 'w': arvfile.truncate(0) - return fclass(arvfile, mode=mode, num_retries=self.num_retries) + binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) + f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) + if 'b' not in mode: + bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader + f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) + return f def modified(self): """Determine if the collection has been modified since last commited.""" @@ -832,6 +873,8 @@ class RichCollectionBase(CollectionBase): self._items[target_name] = item self.set_committed(False) + if not self._has_remote_blocks and source_obj.has_remote_blocks(): + self.set_has_remote_blocks(True) if modified_from: self.notify(MOD, self, target_name, (modified_from, item)) @@ -1038,18 +1081,7 @@ class RichCollectionBase(CollectionBase): """ for item in self: - if isinstance(self[item], ArvadosFile): - for s in self[item].segments(): - if '+R' in s.locator: - try: - loc = remote_blocks[s.locator] - except KeyError: - loc = self._my_keep().refresh_signature(s.locator) - remote_blocks[s.locator] = loc - s.locator = loc - self.set_committed(False) - elif isinstance(self[item], RichCollectionBase): - remote_blocks = self[item]._copy_remote_blocks(remote_blocks) + remote_blocks = self[item]._copy_remote_blocks(remote_blocks) return remote_blocks @synchronized @@ -1286,8 +1318,12 @@ class Collection(RichCollectionBase): self._manifest_locator = manifest_locator_or_text elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text + if not self._has_local_collection_uuid(): + self._has_remote_blocks = True elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text + if '+R' in self._manifest_text: + self._has_remote_blocks = True else: raise errors.ArgumentError( "Argument to CollectionReader is not a manifest or a collection UUID") @@ -1537,10 +1573,11 @@ class Collection(RichCollectionBase): t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") body["trash_at"] = t - # Copy any remote blocks to the local cluster. - self._copy_remote_blocks(remote_blocks={}) - if not self.committed(): + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False if not self._has_collection_uuid(): raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") elif not self._has_local_collection_uuid(): @@ -1629,8 +1666,10 @@ class Collection(RichCollectionBase): if trash_at and type(trash_at) is not datetime.datetime: raise errors.ArgumentError("trash_at must be datetime type.") - # Copy any remote blocks to the local cluster. - self._copy_remote_blocks(remote_blocks={}) + if self._has_remote_blocks: + # Copy any remote blocks to the local cluster. + self._copy_remote_blocks(remote_blocks={}) + self._has_remote_blocks = False self._my_block_manager().commit_all() text = self.manifest_text(strip=False) @@ -1668,6 +1707,9 @@ class Collection(RichCollectionBase): _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') _segment_re = re.compile(r'(\d+):(\d+):(\S+)') + def _unescape_manifest_path(self, path): + return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) + @synchronized def _import_manifest(self, manifest_text): """Import a manifest into a `Collection`. @@ -1692,7 +1734,7 @@ class Collection(RichCollectionBase): if state == STREAM_NAME: # starting a new stream - stream_name = tok.replace('\\040', ' ') + stream_name = self._unescape_manifest_path(tok) blocks = [] segments = [] streamoffset = 0 @@ -1714,13 +1756,18 @@ class Collection(RichCollectionBase): if file_segment: pos = int(file_segment.group(1)) size = int(file_segment.group(2)) - name = file_segment.group(3).replace('\\040', ' ') - filepath = os.path.join(stream_name, name) - afile = self.find_or_create(filepath, FILE) - if isinstance(afile, ArvadosFile): - afile.add_segment(blocks, pos, size) + name = self._unescape_manifest_path(file_segment.group(3)) + if name.split('/')[-1] == '.': + # placeholder for persisting an empty directory, not a real file + if len(name) > 2: + self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) else: - raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) + filepath = os.path.join(stream_name, name) + afile = self.find_or_create(filepath, FILE) + if isinstance(afile, ArvadosFile): + afile.add_segment(blocks, pos, size) + else: + raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) else: # error! raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)