cache_pos, cache_data = self._readline_cache
if self.tell() == cache_pos:
data = [cache_data]
+ self._filepos += len(cache_data)
else:
data = ['']
data_size = len(data[-1])
except ValueError:
nextline_index = len(data)
nextline_index = min(nextline_index, size)
+ self._filepos -= len(data) - nextline_index
self._readline_cache = (self.tell(), data[nextline_index:])
return data[:nextline_index]
def test_bz2_decompression(self):
self.check_decompression('bz2', bz2.compress)
+ def test_readline_then_readlines(self):
+ reader = self.make_newlines_reader()
+ data = reader.readline()
+ self.assertEqual('one\n', data)
+ data = reader.readlines()
+ self.assertEqual(['two\n', '\n', 'three\n', 'four\n', '\n'], data)
+
+ def test_readline_then_readall(self):
+ reader = self.make_newlines_reader()
+ data = reader.readline()
+ self.assertEqual('one\n', data)
+ self.assertEqual(''.join(['two\n', '\n', 'three\n', 'four\n', '\n']), ''.join(reader.readall()))
+
class StreamRetryTestMixin(object):
# Define reader_for(coll_name, **kwargs)