-import functools
-import os
-import zlib
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import listitems, listvalues
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import bz2
-import config
-import hashlib
-import threading
-import Queue
+import collections
import copy
import errno
-import re
+import functools
+import hashlib
import logging
-import collections
+import os
+import queue
+import re
+import sys
+import threading
import uuid
+import zlib
+from . import config
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
stream_name, file_name = '.', path
return stream_name, file_name
+
+class UnownedBlockError(Exception):
+ """Raised when there's an writable block without an owner on the BlockManager."""
+ pass
+
+
class _FileLikeObjectBase(object):
def __init__(self, name, mode):
self.name = name
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._filepos = 0L
+ self._binary = 'b' in mode
+ if sys.version_info >= (3, 0) and not self._binary:
+ raise NotImplementedError("text mode {!r} is not implemented".format(mode))
+ self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self.size())
+ self._filepos = min(max(pos, 0), self.size())
def tell(self):
return self._filepos
def readall(self, size=2**20, num_retries=None):
while True:
data = self.read(size, num_retries=num_retries)
- if data == '':
+ if len(data) == 0:
break
yield data
data = [cache_data]
self._filepos += len(cache_data)
else:
- data = ['']
+ data = [b'']
data_size = len(data[-1])
- while (data_size < size) and ('\n' not in data[-1]):
+ while (data_size < size) and (b'\n' not in data[-1]):
next_read = self.read(2 ** 20, num_retries=num_retries)
if not next_read:
break
data.append(next_read)
data_size += len(next_read)
- data = ''.join(data)
+ data = b''.join(data)
try:
- nextline_index = data.index('\n') + 1
+ nextline_index = data.index(b'\n') + 1
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
- return data[:nextline_index]
+ return data[:nextline_index].decode()
@_FileLikeObjectBase._before_close
@retry_method
data_size += len(s)
if data_size >= sizehint:
break
- return ''.join(data).splitlines(True)
+ return b''.join(data).decode().splitlines(True)
def size(self):
raise NotImplementedError()
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
- return ''
+ return b''
- data = ''
+ data = b''
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
- lr.segment_size,
- num_retries=num_retries)
+ lr.segment_size,
+ num_retries=num_retries)
self._filepos += len(data)
return data
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
- return ''
+ return b''
data = []
for lr in locators_and_ranges(self.segments, start, size):
data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
num_retries=num_retries))
- return ''.join(data)
+ return b''.join(data)
def as_manifest(self):
segs = []
"""
if self._state == _BufferBlock.WRITABLE:
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
while (self.write_pointer+len(data)) > len(self.buffer_block):
new_buffer_block = bytearray(len(self.buffer_block) * 2)
new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None):
+ def __init__(self, keep, copies=None, put_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ if put_threads:
+ self.num_put_threads = put_threads
+ else:
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self._pending_write_size = 0
+ self.threads_lock = threading.Lock()
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
ArvadosFile that owns this block
"""
+ return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+ def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
blockid = "%s" % uuid.uuid4()
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
if self._put_queue is not None:
self._put_queue.task_done()
- @synchronized
def start_put_threads(self):
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=self._commit_bufferblock_worker)
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ with self.threads_lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in range(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
def _block_prefetch_worker(self):
"""The background downloader thread."""
return
self._keep.get(b)
except Exception:
- pass
+ _logger.exception("Exception doing block prefetch")
@synchronized
def start_get_threads(self):
if self._prefetch_threads is None:
- self._prefetch_queue = Queue.Queue()
+ self._prefetch_queue = queue.Queue()
self._prefetch_threads = []
- for i in xrange(0, self.num_get_threads):
+ for i in range(0, self.num_get_threads):
thread = threading.Thread(target=self._block_prefetch_worker)
self._prefetch_threads.append(thread)
thread.daemon = True
self.stop_threads()
@synchronized
- def repack_small_blocks(self, force=False, sync=False):
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- # Search blocks ready for getting packed together before being committed to Keep.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- pending_write_size = sum([b.size() for b in small_blocks])
- if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
- new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None)
- self._bufferblocks[new_bb.blockid] = new_bb
+ if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
+ small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
arvfile = bb.owner
+ self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
arvfile.set_segments([Range(new_bb.blockid,
0,
bb.size(),
new_bb.write_pointer - bb.size())])
- bb.clear()
- del self._bufferblocks[bb.blockid]
+ self._delete_bufferblock(bb.blockid)
self.commit_bufferblock(new_bb, sync=sync)
def commit_bufferblock(self, block, sync):
@synchronized
def delete_bufferblock(self, locator):
+ self._delete_bufferblock(locator)
+
+ def _delete_bufferblock(self, locator):
bb = self._bufferblocks[locator]
bb.clear()
del self._bufferblocks[locator]
self.repack_small_blocks(force=True, sync=True)
with self.lock:
- items = self._bufferblocks.items()
+ items = listitems(self._bufferblocks)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
def writable(self):
return self.parent.writable()
+ @synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
- self._committed = False
+ self.set_committed(False)
def __eq__(self, other):
if other is self:
with self.lock:
if len(self._segments) != len(othersegs):
return False
- for i in xrange(0, len(othersegs)):
+ for i in range(0, len(othersegs)):
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
self._segments = segs
@synchronized
- def set_committed(self):
- """Set committed flag to True"""
- self._committed = True
+ def set_committed(self, value=True):
+ """Set committed flag.
+
+ If value is True, set committed to be True.
+
+ If value is False, set committed to be False for this and all parents.
+ """
+ if value == self._committed:
+ return
+ self._committed = value
+ if self._committed is False and self.parent is not None:
+ self.parent.set_committed(False)
@synchronized
def committed(self):
self._writers.add(writer)
@synchronized
- def remove_writer(self, writer):
+ def remove_writer(self, writer, flush):
"""
Called from ArvadosFileWriter.close(). Remove a writer reference from the list
and do some block maintenance tasks.
"""
self._writers.remove(writer)
- if self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
# All writers closed and size is adequate for repacking
- self.parent._my_block_manager().repack_small_blocks()
+ self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
def closed(self):
"""
new_segs.append(r)
self._segments = new_segs
- self._committed = False
+ self.set_committed(False)
elif size > self.size():
raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
with self.lock:
if size == 0 or offset >= self.size():
- return ''
+ return b''
readsegs = locators_and_ranges(self._segments, offset, size)
prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return ''.join(data)
+ return b''.join(data)
def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
necessary.
"""
+ if not isinstance(data, bytes) and not isinstance(data, memoryview):
+ data = data.encode()
if len(data) == 0:
return
n += config.KEEP_BLOCK_SIZE
return
- self._committed = False
+ self.set_committed(False)
if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
def _add_segment(self, blocks, pos, size):
"""Internal implementation of add_segment."""
- self._committed = False
+ self.set_committed(False)
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
@must_be_writable
@synchronized
def _reparent(self, newparent, newname):
- self._committed = False
+ self.set_committed(False)
self.flush(sync=True)
self.parent.remove(self.name)
self.parent = newparent
"""
- def __init__(self, arvadosfile, num_retries=None):
- super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
+ def __init__(self, arvadosfile, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
data.append(rd)
self._filepos += len(rd)
rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
- return ''.join(data)
+ return b''.join(data)
else:
data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
self._filepos += len(data)
"""
def __init__(self, arvadosfile, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
- self.mode = mode
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
self.arvadosfile.add_writer(self)
@_FileLikeObjectBase._before_close
def flush(self):
self.arvadosfile.flush()
- def close(self):
+ def close(self, flush=True):
if not self.closed:
- self.arvadosfile.remove_writer(self)
+ self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()