From 96d2964dcbc02c3873084ee09183f0f5fb0c44ba Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 17 Dec 2014 16:33:06 -0500 Subject: [PATCH] 3198: Working on optimizing rewrites --- sdk/python/arvados/arvfile.py | 1 - sdk/python/arvados/stream.py | 50 ++++++++++++++++++++++++++++----- sdk/python/tests/test_stream.py | 40 +++++++++++++++++++++----- 3 files changed, 76 insertions(+), 15 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index ef7a6c88d9..768a5ebef5 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -244,7 +244,6 @@ class StreamFileWriter(StreamFileReader): def _writeto(self, offset, data): self._stream._append(data) replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data)) - self._filepos += len(data) def writeto(self, offset, data): with self._stream.mutex: diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 8623ab93d5..0dec9e4f2a 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -149,14 +149,19 @@ class StreamReader(object): class BufferBlock(object): - def __init__(self, locator, streamoffset): + def __init__(self, locator, streamoffset, starting_size=2**16): self.locator = locator - self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE) + self.buffer_block = bytearray(starting_size) self.buffer_view = memoryview(self.buffer_block) self.write_pointer = 0 self.locator_list_entry = [locator, 0, streamoffset] def append(self, data): + while (self.write_pointer+len(data)) > len(self.buffer_block): + new_buffer_block = bytearray(len(self.buffer_block) * 2) + new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] + self.buffer_block = new_buffer_block + self.buffer_view = memoryview(self.buffer_block) self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data self.write_pointer += len(data) self.locator_list_entry[1] = self.write_pointer @@ -196,13 +201,44 @@ class StreamWriter(StreamReader): else: return self._keep.get(locator, num_retries=num_retries) + def _init_bufferblock(self): + last = self._data_locators[-1] + streamoffset = last[OFFSET] + last[BLOCKSIZE] + if last[BLOCKSIZE] == 0: + del self._data_locators[-1] + self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset) + self.bufferblocks[self.current_bblock.locator] = self.current_bblock + self._data_locators.append(self.current_bblock.locator_list_entry) + + def _commit(self): + # commit buffer block + + segs = self._files.values()[0].segments + print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry) + final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]] + print "final_writes %s" % final_writes + # if size of final_writes < size of buffer block ... + + # TODO: do 'put' in the background? + pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer]) + self._data_locators[-1][0] = pdh + self.current_bblock = None + + def commit(self): + with self.mutex: + self._commit() + def _append(self, data): + if len(data) > config.KEEP_BLOCK_SIZE: + raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE") + if self.current_bblock is None: - last = self._data_locators[-1] - streamoffset = last[OFFSET] + last[BLOCKSIZE] - self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset) - self.bufferblocks[self.current_bblock.locator] = self.current_bblock - self._data_locators.append(self.current_bblock.locator_list_entry) + self._init_bufferblock() + + if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE: + self._commit() + self._init_bufferblock() + self.current_bblock.append(data) def append(self, data): diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py index 30f08bacb6..0af274db3c 100644 --- a/sdk/python/tests/test_stream.py +++ b/sdk/python/tests/test_stream.py @@ -6,6 +6,7 @@ import io import mock import os import unittest +import hashlib import arvados from arvados import StreamReader, StreamFileReader, StreamWriter, StreamFileWriter @@ -278,6 +279,10 @@ class StreamWriterTestCase(unittest.TestCase): self.blocks = blocks def get(self, locator, num_retries=0): return self.blocks[locator] + def put(self, data): + pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data)) + self.blocks[pdh] = data + return pdh def test_init(self): stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'], @@ -293,12 +298,6 @@ class StreamWriterTestCase(unittest.TestCase): class StreamFileWriterTestCase(unittest.TestCase): - class MockKeep(object): - def __init__(self, blocks): - self.blocks = blocks - def get(self, locator, num_retries=0): - return self.blocks[locator] - def test_truncate(self): stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'], keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})) @@ -314,6 +313,7 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("56789", writer.readfrom(5, 8)) writer.seek(10) writer.write("foo") + self.assertEqual(writer.size(), 13) self.assertEqual("56789foo", writer.readfrom(5, 8)) def test_write0(self): @@ -323,6 +323,7 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("0123456789", writer.readfrom(0, 13)) writer.seek(0) writer.write("foo") + self.assertEqual(writer.size(), 10) self.assertEqual("foo3456789", writer.readfrom(0, 13)) self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 10:3:count.txt 3:7:count.txt\n", stream.manifest_text()) @@ -333,6 +334,7 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("0123456789", writer.readfrom(0, 13)) writer.seek(3) writer.write("foo") + self.assertEqual(writer.size(), 10) self.assertEqual("012foo6789", writer.readfrom(0, 13)) self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", stream.manifest_text()) @@ -343,6 +345,7 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("0123456789", writer.readfrom(0, 13)) writer.seek(7) writer.write("foo") + self.assertEqual(writer.size(), 10) self.assertEqual("0123456foo", writer.readfrom(0, 13)) self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:3:count.txt\n", stream.manifest_text()) @@ -353,7 +356,7 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("012345678901234", writer.readfrom(0, 15)) writer.seek(7) writer.write("foobar") - print stream.manifest_text() + self.assertEqual(writer.size(), 20) self.assertEqual("0123456foobar34", writer.readfrom(0, 15)) self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", stream.manifest_text()) @@ -364,8 +367,31 @@ class StreamFileWriterTestCase(unittest.TestCase): self.assertEqual("012301230123", writer.readfrom(0, 15)) writer.seek(2) writer.write("abcdefg") + self.assertEqual(writer.size(), 12) self.assertEqual("01abcdefg123", writer.readfrom(0, 15)) self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", stream.manifest_text()) + def test_write_large(self): + stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'], + keep=StreamWriterTestCase.MockKeep({})) + writer = stream.files()["count.txt"] + text = ''.join(["0123456789" for a in xrange(0, 100)]) + for b in xrange(0, 100000): + writer.write(text) + self.assertEqual(writer.size(), 100000000) + stream.commit() + self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text()) + + def test_write_rewrite(self): + stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'], + keep=StreamWriterTestCase.MockKeep({})) + writer = stream.files()["count.txt"] + for b in xrange(0, 10): + writer.seek(0, os.SEEK_SET) + writer.write("0123456789") + stream.commit() + self.assertEqual(writer.size(), 10) + self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text()) + if __name__ == '__main__': unittest.main() -- 2.30.2