import hashlib
import string
import bz2
+import zlib
from apiclient import errors
from apiclient.discovery import build
self._filepos = 0
def name(self):
return self._name
+ def decompressed_name(self):
+ return re.sub('\.(bz2|gz)$', '', self._name)
def size(self):
return self._size
def stream_name(self):
data = decompressor.decompress(chunk)
if data and data != '':
yield data
+ def gunzip(self, size):
+ decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+ for chunk in self.readall(size):
+ data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+ if data and data != '':
+ yield data
def readlines(self, decompress=True):
self._stream.seek(self._pos + self._filepos)
if decompress and re.search('\.bz2$', self._name):
datasource = self.bunzip2(2**10)
+ elif decompress and re.search('\.gz$', self._name):
+ datasource = self.gunzip(2**10)
else:
datasource = self.readall(2**10)
data = ''
class CollectionWriter:
KEEP_BLOCK_SIZE = 2**26
def __init__(self):
- self._data_buffer = ''
+ self._data_buffer = []
+ self._data_buffer_len = 0
self._current_stream_files = []
self._current_stream_length = 0
self._current_stream_locators = []
def __exit__(self):
self.finish()
def write(self, newdata):
- self._data_buffer += newdata
+ self._data_buffer += [newdata]
+ self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
- while len(self._data_buffer) >= self.KEEP_BLOCK_SIZE:
+ while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
self.flush_data()
def flush_data(self):
- if self._data_buffer != '':
- self._current_stream_locators += [Keep.put(self._data_buffer[0:self.KEEP_BLOCK_SIZE])]
- self._data_buffer = self._data_buffer[self.KEEP_BLOCK_SIZE:]
+ data_buffer = ''.join(self._data_buffer)
+ if data_buffer != '':
+ self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+ self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
def start_new_file(self, newfilename=None):
self.finish_current_file()
self.set_current_file_name(newfilename)
import os
import bz2
import sys
+import subprocess
class KeepLocalStoreTest(unittest.TestCase):
def setUp(self):
self.assertEqual(got,
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
+
+class LocalCollectionGzipDecompressionTest(unittest.TestCase):
+ def setUp(self):
+ os.environ['KEEP_LOCAL_STORE'] = '/tmp'
+ def runTest(self):
+ n_lines_in = 2**18
+ data_in = "abc\n"
+ for x in xrange(0, 18):
+ data_in += data_in
+ p = subprocess.Popen(["gzip", "-1cn"],
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False, close_fds=True)
+ compressed_data_in, stderrdata = p.communicate(data_in)
+
+ cw = arvados.CollectionWriter()
+ cw.start_new_file('test.gz')
+ cw.write(compressed_data_in)
+ gzip_manifest = cw.manifest_text()
+
+ cr = arvados.CollectionReader(gzip_manifest)
+ got = 0
+ for x in list(cr.all_files())[0].readlines():
+ self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
+ got += 1
+ self.assertEqual(got,
+ n_lines_in,
+ "decompression returned %d lines instead of %d" % (got, n_lines_in))