-#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
-from __future__ import absolute_import
-from builtins import object
import bz2
import gzip
import io
-import mock
import os
import unittest
import hashlib
+from unittest import mock
+
import arvados
-from arvados import StreamReader, StreamFileReader
from arvados._ranges import Range
from . import arvados_testutil as tutil
from . import run_test_server
-class StreamFileReaderTestCase(unittest.TestCase):
- def make_count_reader(self):
- stream = tutil.MockStreamReader('.', '01234', '34567', '67890')
- return StreamFileReader(stream, [Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)],
- 'count.txt')
-
+class StreamFileReaderTestMixin(object):
def test_read_block_crossing_behavior(self):
# read() calls will be aligned on block boundaries - see #3663.
sfile = self.make_count_reader()
def test_successive_reads(self):
sfile = self.make_count_reader()
- for expect in [b'123', b'456', b'789', b'']:
- self.assertEqual(expect, sfile.read(10))
+ for expect in [b'1234', b'5678', b'9', b'']:
+ self.assertEqual(expect, sfile.read(4))
def test_readfrom_spans_blocks(self):
sfile = self.make_count_reader()
def test_seek_min_zero(self):
sfile = self.make_count_reader()
- sfile.seek(-2, os.SEEK_SET)
+ self.assertEqual(0, sfile.tell())
+ with self.assertRaises(IOError):
+ sfile.seek(-2, os.SEEK_SET)
self.assertEqual(0, sfile.tell())
def test_seek_max_size(self):
sfile = self.make_count_reader()
sfile.seek(2, os.SEEK_END)
- self.assertEqual(9, sfile.tell())
+ # POSIX permits seeking past end of file.
+ self.assertEqual(11, sfile.tell())
def test_size(self):
self.assertEqual(9, self.make_count_reader().size())
- def test_tell_after_block_read(self):
- sfile = self.make_count_reader()
- sfile.read(5)
- self.assertEqual(3, sfile.tell())
-
def test_tell_after_small_read(self):
sfile = self.make_count_reader()
sfile.read(1)
self.assertEqual(b'12', sfile.read(2))
self.assertTrue(sfile.closed, "reader is open after context")
- def make_newlines_reader(self):
- stream = tutil.MockStreamReader('.', 'one\ntwo\n\nth', 'ree\nfour\n\n')
- return StreamFileReader(stream, [Range(0, 0, 11), Range(11, 11, 10)], 'count.txt')
-
def check_lines(self, actual):
self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'],
actual)
def test_readlines_sizehint(self):
result = self.make_newlines_reader().readlines(8)
- self.assertEqual(['one\n', 'two\n'], result[:2])
- self.assertNotIn('three\n', result)
+ self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'], result)
def test_name_attribute(self):
- # Test both .name and .name() (for backward compatibility)
- stream = tutil.MockStreamReader()
- sfile = StreamFileReader(stream, [Range(0, 0, 0)], 'nametest')
+ sfile = self.make_file_reader(name='nametest')
self.assertEqual('nametest', sfile.name)
- self.assertEqual('nametest', sfile.name())
def check_decompressed_name(self, filename, expect):
- stream = tutil.MockStreamReader('.', '')
- reader = StreamFileReader(stream, [Range(0, 0, 0)], filename)
+ reader = self.make_file_reader(name=filename)
self.assertEqual(expect, reader.decompressed_name())
def test_decompressed_name_uncompressed_file(self):
def check_decompression(self, compress_ext, compress_func):
test_text = b'decompression\ntest\n'
test_data = compress_func(test_text)
- stream = tutil.MockStreamReader('.', test_data)
- reader = StreamFileReader(stream, [Range(0, 0, len(test_data))],
- 'test.' + compress_ext)
+ reader = self.make_file_reader(name='test.'+compress_ext, data=test_data)
self.assertEqual(test_text, b''.join(reader.readall_decompressed()))
@staticmethod
reader = self.reader_for('bar_file')
self.assertEqual(b'bar', self.read_for_test(reader, 3))
- @tutil.skip_sleep
- def test_read_no_default_retry(self):
- with tutil.mock_keep_responses('', 500):
- reader = self.reader_for('user_agreement')
- with self.assertRaises(arvados.errors.KeepReadError):
- self.read_for_test(reader, 10)
-
@tutil.skip_sleep
def test_read_with_instance_retries(self):
with tutil.mock_keep_responses('foo', 500, 200):
self.read_for_test(reader, 10, num_retries=1)
-class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
- def reader_for(self, coll_name, **kwargs):
- return StreamReader(self.manifest_for(coll_name).split(),
- self.keep_client(), **kwargs)
-
- def read_for_test(self, reader, byte_count, **kwargs):
- return reader.readfrom(0, byte_count, **kwargs)
-
- def test_manifest_text_without_keep_client(self):
- mtext = self.manifest_for('multilevel_collection_1')
- for line in mtext.rstrip('\n').split('\n'):
- reader = StreamReader(line.split())
- self.assertEqual(line + '\n', reader.manifest_text())
-
-
-class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
- def reader_for(self, coll_name, **kwargs):
- return StreamReader(self.manifest_for(coll_name).split(),
- self.keep_client(), **kwargs).all_files()[0]
-
- def read_for_test(self, reader, byte_count, **kwargs):
- return reader.read(byte_count, **kwargs)
-
-
-class StreamFileReadFromTestCase(StreamFileReadTestCase):
- def read_for_test(self, reader, byte_count, **kwargs):
- return reader.readfrom(0, byte_count, **kwargs)
-
-
-class StreamFileReadAllTestCase(StreamFileReadTestCase):
- def read_for_test(self, reader, byte_count, **kwargs):
- return b''.join(reader.readall(**kwargs))
-
-
-class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
- def read_for_test(self, reader, byte_count, **kwargs):
- return b''.join(reader.readall_decompressed(**kwargs))
-
-
-class StreamFileReadlinesTestCase(StreamFileReadTestCase):
- def read_for_test(self, reader, byte_count, **kwargs):
- return ''.join(reader.readlines(**kwargs)).encode()
-
if __name__ == '__main__':
unittest.main()