From: Tom Clegg Date: Tue, 25 Jun 2013 15:47:49 +0000 (-0400) Subject: add transparent gunzip support, fix string concatenation performance X-Git-Tag: 1.1.0~3168 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a5b113fa6ebecc747ced67f97a84e7f134680b6d add transparent gunzip support, fix string concatenation performance --- diff --git a/sdk/python/arvados.py b/sdk/python/arvados.py index 336cec9907..7e5c4ccc09 100644 --- a/sdk/python/arvados.py +++ b/sdk/python/arvados.py @@ -12,6 +12,7 @@ import re import hashlib import string import bz2 +import zlib from apiclient import errors from apiclient.discovery import build @@ -138,6 +139,8 @@ class StreamFileReader: self._filepos = 0 def name(self): return self._name + def decompressed_name(self): + return re.sub('\.(bz2|gz)$', '', self._name) def size(self): return self._size def stream_name(self): @@ -159,10 +162,18 @@ class StreamFileReader: data = decompressor.decompress(chunk) if data and data != '': yield data + def gunzip(self, size): + decompressor = zlib.decompressobj(16+zlib.MAX_WBITS) + for chunk in self.readall(size): + data = decompressor.decompress(decompressor.unconsumed_tail + chunk) + if data and data != '': + yield data def readlines(self, decompress=True): self._stream.seek(self._pos + self._filepos) if decompress and re.search('\.bz2$', self._name): datasource = self.bunzip2(2**10) + elif decompress and re.search('\.gz$', self._name): + datasource = self.gunzip(2**10) else: datasource = self.readall(2**10) data = '' @@ -330,7 +341,8 @@ class CollectionReader: class CollectionWriter: KEEP_BLOCK_SIZE = 2**26 def __init__(self): - self._data_buffer = '' + self._data_buffer = [] + self._data_buffer_len = 0 self._current_stream_files = [] self._current_stream_length = 0 self._current_stream_locators = [] @@ -343,14 +355,16 @@ class CollectionWriter: def __exit__(self): self.finish() def write(self, newdata): - self._data_buffer += newdata + self._data_buffer += [newdata] + self._data_buffer_len += len(newdata) self._current_stream_length += len(newdata) - while len(self._data_buffer) >= self.KEEP_BLOCK_SIZE: + while self._data_buffer_len >= self.KEEP_BLOCK_SIZE: self.flush_data() def flush_data(self): - if self._data_buffer != '': - self._current_stream_locators += [Keep.put(self._data_buffer[0:self.KEEP_BLOCK_SIZE])] - self._data_buffer = self._data_buffer[self.KEEP_BLOCK_SIZE:] + data_buffer = ''.join(self._data_buffer) + if data_buffer != '': + self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])] + self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]] def start_new_file(self, newfilename=None): self.finish_current_file() self.set_current_file_name(newfilename) diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py index 64a08058bf..7335961c35 100644 --- a/sdk/python/test_collections.py +++ b/sdk/python/test_collections.py @@ -7,6 +7,7 @@ import arvados import os import bz2 import sys +import subprocess class KeepLocalStoreTest(unittest.TestCase): def setUp(self): @@ -175,3 +176,32 @@ class LocalCollectionBZ2DecompressionTest(unittest.TestCase): self.assertEqual(got, n_lines_in, "decompression returned %d lines instead of %d" % (got, n_lines_in)) + +class LocalCollectionGzipDecompressionTest(unittest.TestCase): + def setUp(self): + os.environ['KEEP_LOCAL_STORE'] = '/tmp' + def runTest(self): + n_lines_in = 2**18 + data_in = "abc\n" + for x in xrange(0, 18): + data_in += data_in + p = subprocess.Popen(["gzip", "-1cn"], + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, close_fds=True) + compressed_data_in, stderrdata = p.communicate(data_in) + + cw = arvados.CollectionWriter() + cw.start_new_file('test.gz') + cw.write(compressed_data_in) + gzip_manifest = cw.manifest_text() + + cr = arvados.CollectionReader(gzip_manifest) + got = 0 + for x in list(cr.all_files())[0].readlines(): + self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x) + got += 1 + self.assertEqual(got, + n_lines_in, + "decompression returned %d lines instead of %d" % (got, n_lines_in))