add transparent gunzip support, fix string concatenation performance
authorTom Clegg <tom@clinicalfuture.com>
Tue, 25 Jun 2013 15:47:49 +0000 (11:47 -0400)
committerTom Clegg <tom@clinicalfuture.com>
Tue, 25 Jun 2013 20:51:43 +0000 (16:51 -0400)
sdk/python/arvados.py
sdk/python/test_collections.py

index 336cec99078e2dfa95dc9f1ed5411617dc50894a..7e5c4ccc0923da693ce5366bc12cfa5a22a56d5e 100644 (file)
@@ -12,6 +12,7 @@ import re
 import hashlib
 import string
 import bz2
+import zlib
 
 from apiclient import errors
 from apiclient.discovery import build
@@ -138,6 +139,8 @@ class StreamFileReader:
         self._filepos = 0
     def name(self):
         return self._name
+    def decompressed_name(self):
+        return re.sub('\.(bz2|gz)$', '', self._name)
     def size(self):
         return self._size
     def stream_name(self):
@@ -159,10 +162,18 @@ class StreamFileReader:
             data = decompressor.decompress(chunk)
             if data and data != '':
                 yield data
+    def gunzip(self, size):
+        decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+        for chunk in self.readall(size):
+            data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+            if data and data != '':
+                yield data
     def readlines(self, decompress=True):
         self._stream.seek(self._pos + self._filepos)
         if decompress and re.search('\.bz2$', self._name):
             datasource = self.bunzip2(2**10)
+        elif decompress and re.search('\.gz$', self._name):
+            datasource = self.gunzip(2**10)
         else:
             datasource = self.readall(2**10)
         data = ''
@@ -330,7 +341,8 @@ class CollectionReader:
 class CollectionWriter:
     KEEP_BLOCK_SIZE = 2**26
     def __init__(self):
-        self._data_buffer = ''
+        self._data_buffer = []
+        self._data_buffer_len = 0
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -343,14 +355,16 @@ class CollectionWriter:
     def __exit__(self):
         self.finish()
     def write(self, newdata):
-        self._data_buffer += newdata
+        self._data_buffer += [newdata]
+        self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
-        while len(self._data_buffer) >= self.KEEP_BLOCK_SIZE:
+        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
     def flush_data(self):
-        if self._data_buffer != '':
-            self._current_stream_locators += [Keep.put(self._data_buffer[0:self.KEEP_BLOCK_SIZE])]
-            self._data_buffer = self._data_buffer[self.KEEP_BLOCK_SIZE:]
+        data_buffer = ''.join(self._data_buffer)
+        if data_buffer != '':
+            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
         self.set_current_file_name(newfilename)
index 64a08058bfb42d8e064ae350f9ebc4d6f96661bc..7335961c3521d69097703e7ec7ad8a5f721407a9 100644 (file)
@@ -7,6 +7,7 @@ import arvados
 import os
 import bz2
 import sys
+import subprocess
 
 class KeepLocalStoreTest(unittest.TestCase):
     def setUp(self):
@@ -175,3 +176,32 @@ class LocalCollectionBZ2DecompressionTest(unittest.TestCase):
         self.assertEqual(got,
                          n_lines_in,
                          "decompression returned %d lines instead of %d" % (got, n_lines_in))
+
+class LocalCollectionGzipDecompressionTest(unittest.TestCase):
+    def setUp(self):
+        os.environ['KEEP_LOCAL_STORE'] = '/tmp'
+    def runTest(self):
+        n_lines_in = 2**18
+        data_in = "abc\n"
+        for x in xrange(0, 18):
+            data_in += data_in
+        p = subprocess.Popen(["gzip", "-1cn"],
+                             stdout=subprocess.PIPE,
+                             stdin=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             shell=False, close_fds=True)
+        compressed_data_in, stderrdata = p.communicate(data_in)
+
+        cw = arvados.CollectionWriter()
+        cw.start_new_file('test.gz')
+        cw.write(compressed_data_in)
+        gzip_manifest = cw.manifest_text()
+
+        cr = arvados.CollectionReader(gzip_manifest)
+        got = 0
+        for x in list(cr.all_files())[0].readlines():
+            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
+            got += 1
+        self.assertEqual(got,
+                         n_lines_in,
+                         "decompression returned %d lines instead of %d" % (got, n_lines_in))