import copy
import errno
import re
+import logging
-from .errors import KeepWriteError, AssertionError
+from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
from ._ranges import locators_and_ranges, replace_range, Range
from .retry import retry_method
MOD = "mod"
+WRITE = "write"
+
+_logger = logging.getLogger('arvados.arvfile')
def split(path):
"""split(path) -> streamname, filename
self.buffer_view = None
self.buffer_block = None
else:
- raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
+ raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
@synchronized
def state(self):
bufferblock.append(self.buffer_view[0:self.size()])
return bufferblock
+ @synchronized
+ def clear(self):
+ self.owner = None
+ self.buffer_block = None
+ self.buffer_view = None
+
class NoopLock(object):
def __enter__(self):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
if not self.writable():
- raise IOError((errno.EROFS, "Collection must be writable."))
+ raise IOError(errno.EROFS, "Collection must be writable.")
return orig_func(self, *args, **kwargs)
return must_be_writable_wrapper
self._prefetch_threads = None
self._prefetch_queue = None
- def commit_bufferblock(self, block):
+ def commit_bufferblock(self, block, wait):
"""Initiate a background upload of a bufferblock.
- This will block if the upload queue is at capacity, otherwise it will
- return immediately.
+ :block:
+ The block object to upload
+
+ :wait:
+ If `wait` is True, upload the block synchronously.
+ If `wait` is False, upload the block asynchronously. This will
+ return immediately unless if the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
"""
bufferblock = self._put_queue.get()
if bufferblock is None:
return
+
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
if self._put_queue is not None:
self._put_queue.task_done()
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ if block.state() != _BufferBlock.WRITABLE:
+ return
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
return self._bufferblocks.get(locator)
+ @synchronized
+ def delete_bufferblock(self, locator):
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
+
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
for k,v in items:
if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ v.owner.flush(False)
with self.lock:
if self._put_queue is not None:
pass
raise KeepWriteError("Error writing some blocks", err, label="block")
+ for k,v in items:
+ # flush again with wait=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(True)
+
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
a list of Range objects representing segments
"""
self.parent = parent
+ self.name = name
self._modified = True
self._segments = []
self.lock = parent.root_collection().lock
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- self.name = name
def writable(self):
return self.parent.writable()
self._segments = new_segs
self._modified = True
elif size > self.size():
- raise IOError("truncate() does not support extending the file size")
+ raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
+
+ def readfrom(self, offset, size, num_retries, exact=False):
+ """Read up to `size` bytes from the file starting at `offset`.
- def readfrom(self, offset, size, num_retries):
- """Read upto `size` bytes from the file starting at `offset`."""
+ :exact:
+ If False (default), return less data than requested if the read
+ crosses a block boundary and the next block isn't cached. If True,
+ only return less data than requested when hitting EOF.
+ """
with self.lock:
if size == 0 or offset >= self.size():
data = []
for lr in readsegs:
- block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
raise ArgumentError("Offset is past the end of the file")
if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+ # Chunk it up into smaller writes
+ n = 0
+ dataview = memoryview(data)
+ while n < len(data):
+ self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+ n += config.KEEP_BLOCK_SIZE
+ return
self._modified = True
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ self.parent.notify(WRITE, self.parent, self.name, (self, self))
+
+ return len(data)
+
@synchronized
- def flush(self):
- if self._current_bblock:
- self._repack_writes()
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
- self.parent.notify(MOD, self.parent, self.name, (self, self))
+ def flush(self, wait=True, num_retries=0):
+ """Flush bufferblocks to Keep."""
+ if self.modified():
+ if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
+ self._repack_writes(num_retries)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
+ if wait:
+ to_delete = set()
+ for s in self._segments:
+ bb = self.parent._my_block_manager().get_bufferblock(s.locator)
+ if bb:
+ if bb.state() != _BufferBlock.COMMITTED:
+ _logger.error("bufferblock %s is not committed" % (s.locator))
+ else:
+ to_delete.add(s.locator)
+ s.locator = bb.locator()
+ for s in to_delete:
+ self.parent._my_block_manager().delete_bufferblock(s)
+
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@synchronized
buf += "\n"
return buf
+ @must_be_writable
+ @synchronized
+ def _reparent(self, newparent, newname):
+ self._modified = True
+ self.flush()
+ self.parent.remove(self.name)
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+
class ArvadosFileReader(ArvadosFileReaderBase):
"""Wraps ArvadosFile in a file-like object supporting reading only.
@_FileLikeObjectBase._before_close
@retry_method
- def read(self, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
- self._filepos += len(data)
- return data
+ def read(self, size=None, num_retries=None):
+ """Read up to `size` bytes from the file and return the result.
+
+ Starts at the current file position. If `size` is None, read the
+ entire remainder of the file.
+ """
+ if size is None:
+ data = []
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ while rd:
+ data.append(rd)
+ self._filepos += len(rd)
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ return ''.join(data)
+ else:
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+ self._filepos += len(data)
+ return data
@_FileLikeObjectBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
+ """Read up to `size` bytes from the stream, starting at the specified file offset.
+
+ This method does not change the file position.
+ """
return self.arvadosfile.readfrom(offset, size, num_retries)
def flush(self):
else:
self.arvadosfile.writeto(self._filepos, data, num_retries)
self._filepos += len(data)
+ return len(data)
@_FileLikeObjectBase._before_close
@retry_method