import re
import logging
-from .errors import KeepWriteError, AssertionError
+from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
from ._ranges import locators_and_ranges, replace_range, Range
from .retry import retry_method
MOD = "mod"
+WRITE = "write"
_logger = logging.getLogger('arvados.arvfile')
pass
raise KeepWriteError("Error writing some blocks", err, label="block")
+ for k,v in items:
+ # flush again with wait=True to remove committed bufferblocks from
+ # the segments.
+ if v.owner:
+ v.owner.flush(True)
+
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
raise ArgumentError("Offset is past the end of the file")
if len(data) > config.KEEP_BLOCK_SIZE:
- raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+ # Chunk it up into smaller writes
+ n = 0
+ dataview = memoryview(data)
+ while n < len(data):
+ self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+ n += config.KEEP_BLOCK_SIZE
+ return
self._modified = True
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
- self.parent.notify(MOD, self.parent, self.name, (self, self))
+ self.parent.notify(WRITE, self.parent, self.name, (self, self))
return len(data)
@synchronized
def flush(self, wait=True, num_retries=0):
+ """Flush bufferblocks to Keep."""
if self.modified():
if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
self._repack_writes(num_retries)
@must_be_writable
@synchronized
- def reparent(self, newparent, newname):
+ def _reparent(self, newparent, newname):
self._modified = True
self.flush()
self.parent.remove(self.name)
-
self.parent = newparent
self.name = newname
self.lock = self.parent.root_collection().lock