err.append(self._put_errors.get(False))
except Queue.Empty:
pass
- raise KeepWriteError("Error writing some blocks", err)
+ raise KeepWriteError("Error writing some blocks", err, label="block")
def block_prefetch(self, locator):
"""Initiate a background download of a block.
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ @synchronized
+ def flush(self):
+ if self._current_bblock:
+ self._repack_writes()
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+
+
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
for s in seq:
self.write(s, num_retries)
+ @_FileLikeObjectBase._before_close
def truncate(self, size=None):
if size is None:
size = self._filepos
self.arvadosfile.truncate(size)
if self._filepos > self.size():
self._filepos = self.size()
+
+ @_FileLikeObjectBase._before_close
+ def flush(self):
+ self.arvadosfile.flush()
+
+ @_FileLikeObjectBase._before_close
+ def close(self):
+ self.flush()
+ super(ArvadosFileWriter, self).close()