import os
import zlib
import bz2
-from ._ranges import locators_and_ranges, replace_range, Range
-from arvados.retry import retry_method
import config
import hashlib
import threading
import Queue
import copy
import errno
+import re
+
from .errors import KeepWriteError, AssertionError
from .keep import KeepLocator
-from _normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream
+from ._ranges import locators_and_ranges, replace_range, Range
+from .retry import retry_method
def split(path):
"""split(path) -> streamname, filename
return re.sub('\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
- def seek(self, pos, whence=os.SEEK_CUR):
+ def seek(self, pos, whence=os.SEEK_SET):
if whence == os.SEEK_CUR:
pos += self._filepos
elif whence == os.SEEK_END:
self.buffer_view = None
self.buffer_block = None
else:
- raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
+ raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
@synchronized
def state(self):
thread.daemon = True
thread.start()
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if block.state() == _BufferBlock.WRITABLE:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
items = self._bufferblocks.items()
for k,v in items:
- if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ v.owner.flush()
with self.lock:
if self._put_queue is not None:
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
@synchronized
- def flush(self):
+ def flush(self, num_retries=0):
if self._current_bblock:
- self._repack_writes()
+ self._repack_writes(num_retries)
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
@must_be_writable