import copy
import errno
import re
+import logging
-from .errors import KeepWriteError, AssertionError
+from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
from ._ranges import locators_and_ranges, replace_range, Range
from .retry import retry_method
+MOD = "mod"
+WRITE = "write"
+
+_logger = logging.getLogger('arvados.arvfile')
+
def split(path):
"""split(path) -> streamname, filename
bufferblock.append(self.buffer_view[0:self.size()])
return bufferblock
+ @synchronized
+ def clear(self):
+ self.owner = None
+ self.buffer_block = None
+ self.buffer_view = None
+
class NoopLock(object):
def __enter__(self):
@functools.wraps(orig_func)
def must_be_writable_wrapper(self, *args, **kwargs):
if not self.writable():
- raise IOError((errno.EROFS, "Collection must be writable."))
+ raise IOError(errno.EROFS, "Collection must be writable.")
return orig_func(self, *args, **kwargs)
return must_be_writable_wrapper
self._prefetch_threads = None
self._prefetch_queue = None
- def commit_bufferblock(self, block):
+ def commit_bufferblock(self, block, wait):
"""Initiate a background upload of a bufferblock.
- This will block if the upload queue is at capacity, otherwise it will
- return immediately.
+ :block:
+ The block object to upload
+
+ :wait:
+ If `wait` is True, upload the block synchronously.
+ If `wait` is False, upload the block asynchronously. This will
+ return immediately unless if the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
"""
bufferblock = self._put_queue.get()
if bufferblock is None:
return
+
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
if self._put_queue is not None:
self._put_queue.task_done()
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ if block.state() != _BufferBlock.WRITABLE:
+ return
+
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
- if block.state() == _BufferBlock.WRITABLE:
# Mark the block as PENDING so to disallow any more appends.
block.set_state(_BufferBlock.PENDING)
self._put_queue.put(block)
def get_bufferblock(self, locator):
return self._bufferblocks.get(locator)
+ @synchronized
+ def delete_bufferblock(self, locator):
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
+
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
items = self._bufferblocks.items()
for k,v in items:
- v.owner.flush()
+ if v.state() == _BufferBlock.WRITABLE:
+ v.owner.flush(False)
with self.lock:
if self._put_queue is not None:
pass
raise KeepWriteError("Error writing some blocks", err, label="block")
+ for k,v in items:
+ # flush again with wait=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(True)
+
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
"""
- def __init__(self, parent, stream=[], segments=[]):
+ def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
a list of Range objects representing segments
"""
self.parent = parent
+ self.name = name
self._modified = True
self._segments = []
self.lock = parent.root_collection().lock
return copy.copy(self._segments)
@synchronized
- def clone(self, new_parent):
+ def clone(self, new_parent, new_name):
"""Make a copy of this file."""
- cp = ArvadosFile(new_parent)
+ cp = ArvadosFile(new_parent, new_name)
cp.replace_contents(self)
return cp
# segment is past the trucate size, all done
break
elif size < range_end:
- nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
nr.segment_offset = r.segment_offset
new_segs.append(nr)
break
self._segments = new_segs
self._modified = True
elif size > self.size():
- raise IOError("truncate() does not support extending the file size")
+ raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
- def readfrom(self, offset, size, num_retries):
- """Read upto `size` bytes from the file starting at `offset`."""
+ def readfrom(self, offset, size, num_retries, exact=False):
+ """Read up to `size` bytes from the file starting at `offset`.
+
+ :exact:
+ If False (default), return less data than requested if the read
+ crosses a block boundary and the next block isn't cached. If True,
+ only return less data than requested when hitting EOF.
+ """
with self.lock:
if size == 0 or offset >= self.size():
data = []
for lr in readsegs:
- block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
raise ArgumentError("Offset is past the end of the file")
if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+ # Chunk it up into smaller writes
+ n = 0
+ dataview = memoryview(data)
+ while n < len(data):
+ self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+ n += config.KEEP_BLOCK_SIZE
+ return
self._modified = True
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ self.parent.notify(WRITE, self.parent, self.name, (self, self))
+
+ return len(data)
+
@synchronized
- def flush(self, num_retries=0):
- if self._current_bblock:
- self._repack_writes(num_retries)
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ def flush(self, wait=True, num_retries=0):
+ """Flush bufferblocks to Keep."""
+ if self.modified():
+ if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
+ self._repack_writes(num_retries)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
+ if wait:
+ to_delete = set()
+ for s in self._segments:
+ bb = self.parent._my_block_manager().get_bufferblock(s.locator)
+ if bb:
+ if bb.state() != _BufferBlock.COMMITTED:
+ _logger.error("bufferblock %s is not committed" % (s.locator))
+ else:
+ to_delete.add(s.locator)
+ s.locator = bb.locator()
+ for s in to_delete:
+ self.parent._my_block_manager().delete_bufferblock(s)
+
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@synchronized
"""Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
buf += "\n"
return buf
+ @must_be_writable
+ @synchronized
+ def _reparent(self, newparent, newname):
+ self._modified = True
+ self.flush()
+ self.parent.remove(self.name)
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+
class ArvadosFileReader(ArvadosFileReaderBase):
"""Wraps ArvadosFile in a file-like object supporting reading only.
"""
- def __init__(self, arvadosfile, name, mode="r", num_retries=None):
- super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
@_FileLikeObjectBase._before_close
@retry_method
- def read(self, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
- self._filepos += len(data)
- return data
+ def read(self, size=None, num_retries=None):
+ """Read up to `size` bytes from the file and return the result.
+
+ Starts at the current file position. If `size` is None, read the
+ entire remainder of the file.
+ """
+ if size is None:
+ data = []
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ while rd:
+ data.append(rd)
+ self._filepos += len(rd)
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ return ''.join(data)
+ else:
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+ self._filepos += len(data)
+ return data
@_FileLikeObjectBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position."""
+ """Read up to `size` bytes from the stream, starting at the specified file offset.
+
+ This method does not change the file position.
+ """
return self.arvadosfile.readfrom(offset, size, num_retries)
def flush(self):
"""
- def __init__(self, arvadosfile, name, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, mode, num_retries=None):
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
@_FileLikeObjectBase._before_close
@retry_method
else:
self.arvadosfile.writeto(self._filepos, data, num_retries)
self._filepos += len(data)
+ return len(data)
@_FileLikeObjectBase._before_close
@retry_method