self._prefetch_threads = None
self._prefetch_queue = None
- def commit_bufferblock(self, block):
+ def commit_bufferblock(self, block, wait):
"""Initiate a background upload of a bufferblock.
- This will block if the upload queue is at capacity, otherwise it will
- return immediately.
+ :block:
+ The block object to upload
+
+ :wait:
+ If `wait` is True, upload the block synchronously.
+ If `wait` is False, upload the block asynchronously. This will
+ return immediately unless if the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
"""
bufferblock = self._put_queue.get()
if bufferblock is None:
return
+
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
if self._put_queue is not None:
self._put_queue.task_done()
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ if block.state() != _BufferBlock.WRITABLE:
+ return
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
for k,v in items:
if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ v.owner.flush(False)
with self.lock:
if self._put_queue is not None:
a list of Range objects representing segments
"""
self.parent = parent
+ self.name = name
self._modified = True
self._segments = []
self.lock = parent.root_collection().lock
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- self.name = name
def writable(self):
return self.parent.writable()
elif size > self.size():
raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
- def readfrom(self, offset, size, num_retries):
- """Read upto `size` bytes from the file starting at `offset`."""
+ def readfrom(self, offset, size, num_retries, exact=False):
+ """Read upto `size` bytes from the file starting at `offset`.
+
+ :exact:
+ If False (default), return less data than requested if the read
+ crosses a block boundary and the next block isn't cached. If True,
+ only return less data than requested when hitting EOF.
+ """
with self.lock:
if size == 0 or offset >= self.size():
data = []
for lr in readsegs:
- block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
return len(data)
@synchronized
- def flush(self):
+ def flush(self, wait=True, num_retries=0):
if self.modified():
if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes()
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self._repack_writes(num_retries)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
buf += "\n"
return buf
+ @must_be_writable
+ @synchronized
+ def reparent(self, newparent, newname):
+ self.flush()
+ self.parent.remove(self.name)
+
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+ self._modified = True
class ArvadosFileReader(ArvadosFileReaderBase):
"""Wraps ArvadosFile in a file-like object supporting reading only.