import errno
import re
import logging
+import collections
+import uuid
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
- self._bufferblocks = {}
+ self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
self._prefetch_queue = None
ArvadosFile that owns this block
"""
+ return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+ def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if blockid is None:
- blockid = "bufferblock%i" % len(self._bufferblocks)
+ blockid = "%s" % uuid.uuid4()
bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
def repack_small_blocks(self, force=False, sync=False):
"""Packs small blocks together before uploading"""
# Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
if len(small_blocks) <= 1:
# Not enough small blocks for repacking
# Check if there are enough small blocks for filling up one in full
pending_write_size = sum([b.size() for b in small_blocks])
if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
- new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
- self._bufferblocks[new_bb.blockid] = new_bb
- size = 0
- while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ new_bb = self._alloc_bufferblock()
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
- size += bb.size()
arvfile = bb.owner
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
- bb.clear()
- del self._bufferblocks[bb.blockid]
+ arvfile.set_segments([Range(new_bb.blockid,
+ 0,
+ bb.size(),
+ new_bb.write_pointer - bb.size())])
+ self._delete_bufferblock(bb.blockid)
self.commit_bufferblock(new_bb, sync=sync)
def commit_bufferblock(self, block, sync):
@synchronized
def delete_bufferblock(self, locator):
+ self._delete_bufferblock(locator)
+
+ def _delete_bufferblock(self, locator):
bb = self._bufferblocks[locator]
bb.clear()
del self._bufferblocks[locator]
self._writers.add(writer)
@synchronized
- def remove_writer(self, writer):
+ def remove_writer(self, writer, flush):
"""
Called from ArvadosFileWriter.close(). Remove a writer reference from the list
and do some block maintenance tasks.
"""
self._writers.remove(writer)
- if self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
def flush(self):
self.arvadosfile.flush()
- def close(self):
+ def close(self, flush=True):
if not self.closed:
- self.arvadosfile.remove_writer(self)
+ self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()