From: Peter Amstutz Date: Tue, 16 Dec 2014 22:11:26 +0000 (-0500) Subject: 3198: Can use write to append only. X-Git-Tag: 1.1.0~1780^2~73 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/71959d0d8434b7af5de9a5b8260f0ebe11ec7238 3198: Can use write to append only. --- diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 55718d9e23..3f6461573f 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -100,6 +100,70 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): i += 1 return resp +def replace_range(data_locators, range_start, range_size, new_locator, debug=False): + ''' + Replace a range with a new block. + data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous + range_start: start of range + range_size: size of range + new_locator: locator for new block to be inserted + !!! data_locators will be updated in place !!! + ''' + if range_size == 0: + return + + range_start = long(range_start) + range_size = long(range_size) + range_end = range_start + range_size + + last = data_locators[-1] + if (last[OFFSET]+last[BLOCKSIZE]) == range_start: + # append new block + data_locators.append([new_locator, range_size, range_start]) + return + + i = first_block(data_locators, range_start, range_size, debug) + if i is None: + return + + while i < len(data_locators): + locator, block_size, block_start = data_locators[i] + block_end = block_start + block_size + if debug: + print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end + if range_end <= block_start: + # range ends before this block starts, so don't look at any more locators + break + + #if range_start >= block_end: + # range starts after this block ends, so go to next block + # we should always start at the first block due to the binary above, so this test is redundant + #next + + if range_start >= block_start and range_end <= block_end: + # range starts and ends in this block + # split block into 3 pieces + #resp.append([locator, block_size, range_start - block_start, range_size]) + pass + elif range_start >= block_start and range_end > block_end: + # range starts in this block + # split block into 2 pieces + #resp.append([locator, block_size, range_start - block_start, block_end - range_start]) + pass + elif range_start < block_start and range_end > block_end: + # range starts in a previous block and extends to further blocks + # zero out this block + #resp.append([locator, block_size, 0L, block_size]) + pass + elif range_start < block_start and range_end <= block_end: + # range starts in a previous block and ends in this block + # split into 2 pieces + #resp.append([locator, block_size, 0L, range_end - block_start]) + pass + block_start = block_end + i += 1 + + def split(path): """split(path) -> streamname, filename @@ -148,15 +212,18 @@ class StreamFileReader(ArvadosFileBase): pos += self._filepos elif whence == os.SEEK_END: pos += self.size() - self._filepos = min(max(pos, 0L), self.size()) + self._filepos = min(max(pos, 0L), self._size()) def tell(self): return self._filepos - def size(self): + def _size(self): n = self.segments[-1] return n[OFFSET] + n[BLOCKSIZE] + def size(self): + return self._size() + @ArvadosFileBase._before_close @retry_method def read(self, size, num_retries=None): @@ -310,10 +377,13 @@ class StreamReader(object): def all_files(self): return self._files.values() - def size(self): + def _size(self): n = self._data_locators[-1] return n[OFFSET] + n[BLOCKSIZE] + def size(self): + return self._size() + def locators_and_ranges(self, range_start, range_size): return locators_and_ranges(self._data_locators, range_start, range_size) @@ -396,7 +466,8 @@ class StreamWriter(StreamReader): def _append(self, data): if self.current_bblock is None: - streamoffset = sum([x[1] for x in self._data_locators]) + last = self._data_locators[-1] + streamoffset = last[OFFSET] + last[BLOCKSIZE] self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset) self.bufferblocks[self.current_bblock.locator] = self.current_bblock self._data_locators.append(self.current_bblock.locator_list_entry) @@ -441,13 +512,17 @@ class StreamFileWriter(StreamFileReader): self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset]) streamoffset += blocksize fileoffset += segmentsize + if len(newstream) == 0: + newstream.append(config.EMPTY_BLOCK_LOCATOR) + self.segments.append([0, 0, 0]) self._stream._data_locators = newstream if self._filepos > fileoffset: self._filepos = fileoffset def _writeto(self, offset, data): - # TODO - pass + self._stream._append(data) + replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data)) + self._filepos += len(data) def writeto(self, offset, data): with self._stream.mutex: diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py index b5130cb743..f07ca6c7c1 100644 --- a/sdk/python/tests/test_stream.py +++ b/sdk/python/tests/test_stream.py @@ -307,5 +307,15 @@ class StreamFileWriterTestCase(unittest.TestCase): writer.truncate(8) self.assertEqual("567", writer.readfrom(5, 8)) + def test_append(self): + stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'], + keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})) + writer = stream.files()["count.txt"] + self.assertEqual("56789", writer.readfrom(5, 8)) + writer.seek(10) + writer.write("foo") + self.assertEqual("56789foo", writer.readfrom(5, 8)) + #print stream.manifest_text() + if __name__ == '__main__': unittest.main()