-import functools
-import os
-import zlib
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import listitems, listvalues
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import bz2
-import config
-import hashlib
-import threading
-import Queue
+import collections
import copy
import errno
-import re
+import functools
+import hashlib
import logging
-import collections
+import os
+import queue
+import re
+import sys
+import threading
import uuid
+import zlib
+from . import config
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._filepos = 0L
+ self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
+ return re.sub(r'\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
def seek(self, pos, whence=os.SEEK_SET):
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- if pos < 0L:
+ if pos < 0:
raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
self._filepos = pos
return self._filepos
def readall(self, size=2**20, num_retries=None):
while True:
data = self.read(size, num_retries=num_retries)
- if data == '':
+ if len(data) == 0:
break
yield data
data = [cache_data]
self._filepos += len(cache_data)
else:
- data = ['']
+ data = [b'']
data_size = len(data[-1])
- while (data_size < size) and ('\n' not in data[-1]):
+ while (data_size < size) and (b'\n' not in data[-1]):
next_read = self.read(2 ** 20, num_retries=num_retries)
if not next_read:
break
data.append(next_read)
data_size += len(next_read)
- data = ''.join(data)
+ data = b''.join(data)
try:
- nextline_index = data.index('\n') + 1
+ nextline_index = data.index(b'\n') + 1
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
- return data[:nextline_index]
+ return data[:nextline_index].decode()
@_FileLikeObjectBase._before_close
@retry_method
data_size += len(s)
if data_size >= sizehint:
break
- return ''.join(data).splitlines(True)
+ return b''.join(data).decode().splitlines(True)
def size(self):
raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
- return ''
+ return b''
- data = ''
+ data = b''
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
- lr.segment_size,
- num_retries=num_retries)
+ lr.segment_size,
+ num_retries=num_retries)
self._filepos += len(data)
return data
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
- return ''
+ return b''
data = []
for lr in locators_and_ranges(self.segments, start, size):
data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
num_retries=num_retries))
- return ''.join(data)
+ return b''.join(data)
def as_manifest(self):
segs = []
"""
if self._state == _BufferBlock.WRITABLE:
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
while (self.write_pointer+len(data)) > len(self.buffer_block):
new_buffer_block = bytearray(len(self.buffer_block) * 2)
new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None):
+ def __init__(self, keep,
+ copies=None,
+ put_threads=None,
+ num_retries=None,
+ storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
- self._prefetch_queue = None
- self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.copies = copies
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self.num_retries = num_retries
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
finally:
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
+ # generating data more quickly than it can be sent to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
# blocks pending. If they are full 64 MiB blocks, that means up to
# 256 MiB of internal buffering, which is the same size as the
# default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
+ self._put_queue = queue.Queue(maxsize=2)
self._put_threads = []
- for i in xrange(0, self.num_put_threads):
+ for i in range(0, self.num_put_threads):
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
thread.daemon = True
thread.start()
- def _block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b)
- except Exception:
- _logger.exception("Exception doing block prefetch")
-
- @synchronized
- def start_get_threads(self):
- if self._prefetch_threads is None:
- self._prefetch_queue = Queue.Queue()
- self._prefetch_threads = []
- for i in xrange(0, self.num_get_threads):
- thread = threading.Thread(target=self._block_prefetch_worker)
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
-
-
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
self._put_threads = None
self._put_queue = None
- if self._prefetch_threads is not None:
- for t in self._prefetch_threads:
- self._prefetch_queue.put(None)
- for t in self._prefetch_threads:
- t.join()
- self._prefetch_threads = None
- self._prefetch_queue = None
-
def __enter__(self):
return self
if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
return
- # Search blocks ready for getting packed together before being committed to Keep.
+ # Search blocks ready for getting packed together before being
+ # committed to Keep.
# A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
+ # A WRITABLE block with its owner.closed() implies that its
# size is <= KEEP_BLOCK_SIZE/2.
try:
- small_blocks = [b for b in self._bufferblocks.values()
+ small_blocks = [b for b in listvalues(self._bufferblocks)
if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
except AttributeError:
# Writable blocks without owner shouldn't exist.
return
new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
files = []
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
files.append((bb, new_bb.write_pointer - bb.size()))
newsegs = bb.owner.segments()
for s in newsegs:
if s.locator == bb.blockid:
- s.locator = new_bb.locator()
+ s.locator = new_bb.blockid
s.segment_offset = new_bb_segment_offset+s.segment_offset
bb.owner.set_segments(newsegs)
self._delete_bufferblock(bb.blockid)
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
self.repack_small_blocks(force=True, sync=True)
with self.lock:
- items = self._bufferblocks.items()
+ items = listitems(self._bufferblocks)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
- v.owner.flush(sync=False)
+ # Ignore blocks with a list of owners, as if they're not in COMMITTED
+ # state, they're already being committed asynchronously.
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=False)
with self.lock:
if self._put_queue is not None:
# flush again with sync=True to remove committed bufferblocks from
# the segments.
if v.owner:
- v.owner.flush(sync=True)
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=True)
+ elif isinstance(v.owner, list) and len(v.owner) > 0:
+ # This bufferblock is referenced by many files as a result
+ # of repacking small blocks, so don't delete it when flushing
+ # its owners, just do it after flushing them all.
+ for owner in v.owner:
+ owner.flush(sync=True)
+ self.delete_bufferblock(k)
+
+ self.stop_threads()
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-
- This assumes that the underlying KeepClient implements a block cache,
- so repeated requests for the same block will not result in repeated
- downloads (unless the block is evicted from the cache.) This method
- does not block.
-
"""
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
- self.start_get_threads()
- self._prefetch_queue.put(locator)
+ self._keep.block_prefetch(locator)
class ArvadosFile(object):
"""
+ __slots__ = ('parent', 'name', '_writers', '_committed',
+ '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
return True
return False
+ @synchronized
+ def has_remote_blocks(self):
+ """Returns True if any of the segment's locators has a +R signature"""
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ return True
+ return False
+
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Ask Keep to copy remote blocks and point to their local copies.
+
+ This is called from the parent Collection.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+ """
+
+ for s in self._segments:
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self.parent._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.parent.set_committed(False)
+ return remote_blocks
+
@synchronized
def segments(self):
return copy.copy(self._segments)
with self.lock:
if len(self._segments) != len(othersegs):
return False
- for i in xrange(0, len(othersegs)):
+ for i in range(0, len(othersegs)):
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
"""
self._writers.remove(writer)
- if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
with self.lock:
if size == 0 or offset >= self.size():
- return ''
+ return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return ''.join(data)
+ return b''.join(data)
@must_be_writable
@synchronized
necessary.
"""
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
if len(data) == 0:
return
to_delete.add(s.locator)
s.locator = bb.locator()
for s in to_delete:
- self.parent._my_block_manager().delete_bufferblock(s)
+ # Don't delete the bufferblock if it's owned by many files. It'll be
+ # deleted after all of its owners are flush()ed.
+ if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+ self.parent._my_block_manager().delete_bufferblock(s)
self.parent.notify(MOD, self.parent, self.name, (self, self))
"""
- def __init__(self, arvadosfile, num_retries=None):
- super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
+ def __init__(self, arvadosfile, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
def stream_name(self):
return self.arvadosfile.parent.stream_name()
+ def readinto(self, b):
+ data = self.read(len(b))
+ b[:len(data)] = data
+ return len(data)
+
@_FileLikeObjectBase._before_close
@retry_method
def read(self, size=None, num_retries=None):
data.append(rd)
self._filepos += len(rd)
rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
- return ''.join(data)
+ return b''.join(data)
else:
data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
self._filepos += len(data)
"""
def __init__(self, arvadosfile, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
- self.mode = mode
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
self.arvadosfile.add_writer(self)
def writable(self):
@retry_method
def write(self, data, num_retries=None):
if self.mode[0] == "a":
- self.arvadosfile.writeto(self.size(), data, num_retries)
- else:
- self.arvadosfile.writeto(self._filepos, data, num_retries)
- self._filepos += len(data)
+ self._filepos = self.size()
+ self.arvadosfile.writeto(self._filepos, data, num_retries)
+ self._filepos += len(data)
return len(data)
@_FileLikeObjectBase._before_close
if not self.closed:
self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+ """An interface to an Arvados file that's compatible with io wrappers.
+
+ """
+ def __init__(self, f):
+ self.f = f
+ self.closed = False
+ def close(self):
+ self.closed = True
+ return self.f.close()
+ def flush(self):
+ return self.f.flush()
+ def read(self, *args, **kwargs):
+ return self.f.read(*args, **kwargs)
+ def readable(self):
+ return self.f.readable()
+ def readinto(self, *args, **kwargs):
+ return self.f.readinto(*args, **kwargs)
+ def seek(self, *args, **kwargs):
+ return self.f.seek(*args, **kwargs)
+ def seekable(self):
+ return self.f.seekable()
+ def tell(self):
+ return self.f.tell()
+ def writable(self):
+ return self.f.writable()
+ def write(self, *args, **kwargs):
+ return self.f.write(*args, **kwargs)