X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/90209af8fa35bc99c9821db0c815404d1234ef31..58a026e09bda4c1e2374347615c325007c64fac4:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index a2ec76a076..f4580f346b 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,18 +1,30 @@ -import functools -import os -import zlib +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import absolute_import +from __future__ import division +from future import standard_library +from future.utils import listitems, listvalues +standard_library.install_aliases() +from builtins import range +from builtins import object import bz2 -import config -import hashlib -import threading -import Queue +import collections import copy import errno -import re +import functools +import hashlib import logging -import collections +import os +import queue +import re +import sys +import threading import uuid +import zlib +from . import config from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream @@ -76,7 +88,10 @@ class _FileLikeObjectBase(object): class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) - self._filepos = 0L + self._binary = 'b' in mode + if sys.version_info >= (3, 0) and not self._binary: + raise NotImplementedError("text mode {!r} is not implemented".format(mode)) + self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) @@ -96,7 +111,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - if pos < 0L: + if pos < 0: raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") self._filepos = pos return self._filepos @@ -118,7 +133,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): def readall(self, size=2**20, num_retries=None): while True: data = self.read(size, num_retries=num_retries) - if data == '': + if len(data) == 0: break yield data @@ -130,23 +145,23 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): data = [cache_data] self._filepos += len(cache_data) else: - data = [''] + data = [b''] data_size = len(data[-1]) - while (data_size < size) and ('\n' not in data[-1]): + while (data_size < size) and (b'\n' not in data[-1]): next_read = self.read(2 ** 20, num_retries=num_retries) if not next_read: break data.append(next_read) data_size += len(next_read) - data = ''.join(data) + data = b''.join(data) try: - nextline_index = data.index('\n') + 1 + nextline_index = data.index(b'\n') + 1 except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) - return data[:nextline_index] + return data[:nextline_index].decode() @_FileLikeObjectBase._before_close @retry_method @@ -181,7 +196,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): data_size += len(s) if data_size >= sizehint: break - return ''.join(data).splitlines(True) + return b''.join(data).decode().splitlines(True) def size(self): raise IOError(errno.ENOSYS, "Not implemented") @@ -218,15 +233,15 @@ class StreamFileReader(ArvadosFileReaderBase): def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: - return '' + return b'' - data = '' + data = b'' available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, - lr.segment_size, - num_retries=num_retries) + lr.segment_size, + num_retries=num_retries) self._filepos += len(data) return data @@ -236,13 +251,13 @@ class StreamFileReader(ArvadosFileReaderBase): def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: - return '' + return b'' data = [] for lr in locators_and_ranges(self.segments, start, size): data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) - return ''.join(data) + return b''.join(data) def as_manifest(self): segs = [] @@ -323,6 +338,8 @@ class _BufferBlock(object): """ if self._state == _BufferBlock.WRITABLE: + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() while (self.write_pointer+len(data)) > len(self.buffer_block): new_buffer_block = bytearray(len(self.buffer_block) * 2) new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] @@ -544,7 +561,6 @@ class _BlockManager(object): else: loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) - except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) finally: @@ -565,10 +581,10 @@ class _BlockManager(object): # blocks pending. If they are full 64 MiB blocks, that means up to # 256 MiB of internal buffering, which is the same size as the # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) + self._put_queue = queue.Queue(maxsize=2) self._put_threads = [] - for i in xrange(0, self.num_put_threads): + for i in range(0, self.num_put_threads): thread = threading.Thread(target=self._commit_bufferblock_worker) self._put_threads.append(thread) thread.daemon = True @@ -588,9 +604,9 @@ class _BlockManager(object): @synchronized def start_get_threads(self): if self._prefetch_threads is None: - self._prefetch_queue = Queue.Queue() + self._prefetch_queue = queue.Queue() self._prefetch_threads = [] - for i in xrange(0, self.num_get_threads): + for i in range(0, self.num_get_threads): thread = threading.Thread(target=self._block_prefetch_worker) self._prefetch_threads.append(thread) thread.daemon = True @@ -633,12 +649,13 @@ class _BlockManager(object): if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): return - # Search blocks ready for getting packed together before being committed to Keep. + # Search blocks ready for getting packed together before being + # committed to Keep. # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's + # A WRITABLE block with its owner.closed() implies that its # size is <= KEEP_BLOCK_SIZE/2. try: - small_blocks = [b for b in self._bufferblocks.values() + small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] except AttributeError: # Writable blocks without owner shouldn't exist. @@ -659,9 +676,11 @@ class _BlockManager(object): return new_bb = self._alloc_bufferblock() + new_bb.owner = [] files = [] while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) + new_bb.owner.append(bb.owner) self._pending_write_size -= bb.size() new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) files.append((bb, new_bb.write_pointer - bb.size())) @@ -672,7 +691,7 @@ class _BlockManager(object): newsegs = bb.owner.segments() for s in newsegs: if s.locator == bb.blockid: - s.locator = new_bb.locator() + s.locator = new_bb.blockid s.segment_offset = new_bb_segment_offset+s.segment_offset bb.owner.set_segments(newsegs) self._delete_bufferblock(bb.blockid) @@ -778,11 +797,14 @@ class _BlockManager(object): self.repack_small_blocks(force=True, sync=True) with self.lock: - items = self._bufferblocks.items() + items = listitems(self._bufferblocks) for k,v in items: if v.state() != _BufferBlock.COMMITTED and v.owner: - v.owner.flush(sync=False) + # Ignore blocks with a list of owners, as if they're not in COMMITTED + # state, they're already being committed asynchronously. + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: @@ -799,7 +821,15 @@ class _BlockManager(object): # flush again with sync=True to remove committed bufferblocks from # the segments. if v.owner: - v.owner.flush(sync=True) + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=True) + elif isinstance(v.owner, list) and len(v.owner) > 0: + # This bufferblock is referenced by many files as a result + # of repacking small blocks, so don't delete it when flushing + # its owners, just do it after flushing them all. + for owner in v.owner: + owner.flush(sync=True) + self.delete_bufferblock(k) def block_prefetch(self, locator): """Initiate a background download of a block. @@ -836,6 +866,9 @@ class ArvadosFile(object): """ + __slots__ = ('parent', 'name', '_writers', '_committed', + '_segments', 'lock', '_current_bblock', 'fuse_entry') + def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. @@ -910,7 +943,7 @@ class ArvadosFile(object): with self.lock: if len(self._segments) != len(othersegs): return False - for i in xrange(0, len(othersegs)): + for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator @@ -970,7 +1003,7 @@ class ArvadosFile(object): """ self._writers.remove(writer) - if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): @@ -1035,7 +1068,7 @@ class ArvadosFile(object): with self.lock: if size == 0 or offset >= self.size(): - return '' + return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) @@ -1055,7 +1088,7 @@ class ArvadosFile(object): self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) - return ''.join(data) + return b''.join(data) @must_be_writable @synchronized @@ -1066,6 +1099,8 @@ class ArvadosFile(object): necessary. """ + if not isinstance(data, bytes) and not isinstance(data, memoryview): + data = data.encode() if len(data) == 0: return @@ -1128,7 +1163,10 @@ class ArvadosFile(object): to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: - self.parent._my_block_manager().delete_bufferblock(s) + # Don't delete the bufferblock if it's owned by many files. It'll be + # deleted after all of its owners are flush()ed. + if self.parent._my_block_manager().get_bufferblock(s).owner is self: + self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @@ -1198,8 +1236,8 @@ class ArvadosFileReader(ArvadosFileReaderBase): """ - def __init__(self, arvadosfile, num_retries=None): - super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries) + def __init__(self, arvadosfile, mode="r", num_retries=None): + super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): @@ -1223,7 +1261,7 @@ class ArvadosFileReader(ArvadosFileReaderBase): data.append(rd) self._filepos += len(rd) rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) - return ''.join(data) + return b''.join(data) else: data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) @@ -1251,8 +1289,7 @@ class ArvadosFileWriter(ArvadosFileReader): """ def __init__(self, arvadosfile, mode, num_retries=None): - super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) - self.mode = mode + super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) self.arvadosfile.add_writer(self) def writable(self): @@ -1262,10 +1299,9 @@ class ArvadosFileWriter(ArvadosFileReader): @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": - self.arvadosfile.writeto(self.size(), data, num_retries) - else: - self.arvadosfile.writeto(self._filepos, data, num_retries) - self._filepos += len(data) + self._filepos = self.size() + self.arvadosfile.writeto(self._filepos, data, num_retries) + self._filepos += len(data) return len(data) @_FileLikeObjectBase._before_close