Merge branch '4759-timestamp-precision-TC' closes #4759
[arvados.git] / sdk / python / tests / test_collections.py
index 30e70449b0d540d6c093374df0ac7329363d007b..dbbe3f5e73deca582b65f42900f8181e52a63a02 100644 (file)
@@ -3,14 +3,12 @@
 # ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
 
 import arvados
-import bz2
 import copy
 import hashlib
 import mock
 import os
 import pprint
 import re
-import subprocess
 import tempfile
 import unittest
 
@@ -124,25 +122,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                            [2, '.', 'ob.txt', 'ob'],
                            [0, '.', 'zero.txt', '']])
 
-    def _test_readline(self, what_in, what_out):
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.txt')
-        cw.write(what_in)
-        test1 = cw.finish()
-        cr = arvados.CollectionReader(test1, self.api_client)
-        got = []
-        for x in list(cr.all_files())[0].readlines():
-            got += [x]
-        self.assertEqual(got,
-                         what_out,
-                         "readlines did not split lines correctly: %s" % got)
-
-    def test_collection_readline(self):
-        self._test_readline("\na\nbcd\n\nefg\nz",
-                            ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
-        self._test_readline("ab\ncd\n",
-                            ["ab\n", "cd\n"])
-
     def test_collection_empty_file(self):
         cw = arvados.CollectionWriter(self.api_client)
         cw.start_new_file('zero.txt')
@@ -179,53 +158,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
             got_sizes += [f.size()]
         self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
 
-    def test_collection_bz2_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        compressed_data_in = bz2.compress(data_in)
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.bz2')
-        cw.write(compressed_data_in)
-        bz2_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(bz2_manifest, self.api_client)
-
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
-    def test_collection_gzip_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        p = subprocess.Popen(["gzip", "-1cn"],
-                             stdout=subprocess.PIPE,
-                             stdin=subprocess.PIPE,
-                             stderr=subprocess.PIPE,
-                             shell=False, close_fds=True)
-        compressed_data_in, stderrdata = p.communicate(data_in)
-
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.gz')
-        cw.write(compressed_data_in)
-        gzip_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(gzip_manifest, self.api_client)
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
     def test_normalized_collection(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -370,79 +302,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
                                                                        ['c', 5, 0, 1]])
 
-    class MockStreamReader(object):
-        def __init__(self, content):
-            self.content = content
-            self.num_retries = 0
-
-        def readfrom(self, start, size, num_retries=0):
-            return self.content[start:start+size]
-
-    def test_file_stream(self):
-        content = 'abcdefghijklmnopqrstuvwxyz0123456789'
-        msr = self.MockStreamReader(content)
-        segments = [[0, 10, 0],
-                    [10, 15, 10],
-                    [25, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.name(), "test")
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[0:30])
-        self.assertEqual(sfr.readfrom(2, 30), content[2:30])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[2:10])
-        self.assertEqual(sfr.readfrom(0, 10), content[0:10])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[20:25])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[25:30])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-        segments = [[26, 10, 0],
-                    [0, 15, 10],
-                    [15, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
-        self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[28:36])
-        self.assertEqual(sfr.readfrom(0, 10), content[26:36])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[26:31])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[31:36])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-
     class MockKeep(object):
         def __init__(self, content, num_retries=0):
             self.content = content
@@ -474,30 +333,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(sr.readfrom(25, 5), content[25:30])
         self.assertEqual(sr.readfrom(30, 5), '')
 
-    def test_file_reader(self):
-        keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
-                      'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
-                      'cccccccccccccccccccccccccccccccc+5': 'z0123'}
-        mk = self.MockKeep(keepblocks)
-
-        sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
-
-        content = 'abcdefghijpqrstuvwxy'
-
-        f = sr.files()["foo"]
-
-        # f.read() calls will be aligned on block boundaries (as a
-        # result of ticket #3663).
-
-        f.seek(0)
-        self.assertEqual(f.read(20), content[0:10])
-
-        f.seek(0)
-        self.assertEqual(f.read(6), content[0:6])
-        self.assertEqual(f.read(6), content[6:10])
-        self.assertEqual(f.read(6), content[10:16])
-        self.assertEqual(f.read(6), content[16:20])
-
     def test_extract_file(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
@@ -665,17 +500,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                 ).manifest_text())
 
 
-class CollectionTestMixin(object):
-    PROXY_RESPONSE = {
-        'items_available': 1,
-        'items': [{
-                'uuid': 'zzzzz-bi6l4-mockproxy012345',
-                'owner_uuid': 'zzzzz-tpzed-mockowner012345',
-                'service_host': tutil.TEST_HOST,
-                'service_port': 65535,
-                'service_ssl_flag': True,
-                'service_type': 'proxy',
-                }]}
+class CollectionTestMixin(tutil.ApiClientMock):
     API_COLLECTIONS = run_test_server.fixture('collections')
     DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
     DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
@@ -685,20 +510,9 @@ class CollectionTestMixin(object):
     ALT_DATA_HASH = ALT_COLLECTION['portable_data_hash']
     ALT_MANIFEST = ALT_COLLECTION['manifest_text']
 
-    def _mock_api_call(self, mock_method, code, body):
-        mock_method = mock_method().execute
-        if code == 200:
-            mock_method.return_value = body
-        else:
-            mock_method.side_effect = arvados.errors.ApiError(
-                tutil.fake_httplib2_response(code), "{}")
-
-    def mock_keep_services(self, api_mock, code, body):
-        self._mock_api_call(api_mock.keep_services().accessible, code, body)
-
-    def api_client_mock(self, code=200):
-        client = mock.MagicMock(name='api_client')
-        self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+    def api_client_mock(self, status=200):
+        client = super(CollectionTestMixin, self).api_client_mock()
+        self.mock_keep_services(client, status=status, service_type='proxy', count=1)
         return client
 
 
@@ -708,9 +522,9 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
         body = self.API_COLLECTIONS.get(body)
         self._mock_api_call(api_mock.collections().get, code, body)
 
-    def api_client_mock(self, code=200):
-        client = super(CollectionReaderTestCase, self).api_client_mock(code)
-        self.mock_get_collection(client, code, 'foo_file')
+    def api_client_mock(self, status=200):
+        client = super(CollectionReaderTestCase, self).api_client_mock()
+        self.mock_get_collection(client, status, 'foo_file')
         return client
 
     def test_init_no_default_retries(self):
@@ -808,6 +622,58 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
                                           api_client=client)
         self.assertEqual('', reader.manifest_text())
 
+    def test_api_response(self):
+        client = self.api_client_mock()
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertEqual(self.DEFAULT_COLLECTION, reader.api_response())
+
+    def test_api_response_with_collection_from_keep(self):
+        client = self.api_client_mock()
+        self.mock_get_collection(client, 404, 'foo')
+        with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+            reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+                                              api_client=client)
+            api_response = reader.api_response()
+        self.assertIsNone(api_response)
+
+    def check_open_file(self, coll_file, stream_name, file_name, file_size):
+        self.assertFalse(coll_file.closed, "returned file is not open")
+        self.assertEqual(stream_name, coll_file.stream_name())
+        self.assertEqual(file_name, coll_file.name())
+        self.assertEqual(file_size, coll_file.size())
+
+    def test_open_collection_file_one_argument(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('./foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_collection_file_two_arguments(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('.', 'foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_deep_file(self):
+        coll_name = 'collection_with_files_in_subdir'
+        client = self.api_client_mock(200)
+        self.mock_get_collection(client, 200, coll_name)
+        reader = arvados.CollectionReader(
+            self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
+        cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+        self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
+                             32)
+
+    def test_open_nonexistent_stream(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, './nonexistent', 'foo')
+
+    def test_open_nonexistent_file(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, '.', 'nonexistent')
+
 
 @tutil.skip_sleep
 class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
@@ -816,8 +682,8 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         return tutil.mock_put_responses(body, *codes, **headers)
 
     def foo_writer(self, **kwargs):
-        api_client = self.api_client_mock()
-        writer = arvados.CollectionWriter(api_client, **kwargs)
+        kwargs.setdefault('api_client', self.api_client_mock())
+        writer = arvados.CollectionWriter(**kwargs)
         writer.start_new_file('foo')
         writer.write('foo')
         return writer
@@ -833,6 +699,32 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 writer.finish()
 
+    def test_write_insufficient_replicas_via_proxy(self):
+        writer = self.foo_writer(replication=3)
+        with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.manifest_text()
+
+    def test_write_insufficient_replicas_via_disks(self):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, status=200, service_type='disk', count=2)
+        writer = self.foo_writer(api_client=client, replication=3)
+        with self.mock_keep(
+                None, 200, 200,
+                **{'x-keep-replicas-stored': 1}) as keepmock:
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.manifest_text()
+
+    def test_write_three_replicas(self):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, status=200, service_type='disk', count=6)
+        writer = self.foo_writer(api_client=client, replication=3)
+        with self.mock_keep(
+                None, 500, 500, 500, 200, 200, 200,
+                **{'x-keep-replicas-stored': 1}) as keepmock:
+            writer.manifest_text()
+            self.assertEqual(6, keepmock.call_count)
+
     def test_write_whole_collection_through_retries(self):
         writer = self.foo_writer(num_retries=2)
         with self.mock_keep(self.DEFAULT_DATA_HASH,
@@ -846,6 +738,78 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             writer.flush_data()
         self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
 
+    def test_one_open(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('out') as out_file:
+            self.assertEqual('.', writer.current_stream_name())
+            self.assertEqual('out', writer.current_file_name())
+            out_file.write('test data')
+            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+        self.assertTrue(out_file.closed, "writer file not closed after context")
+        self.assertRaises(ValueError, out_file.write, 'extra text')
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:9:out\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_open_writelines(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('six') as out_file:
+            out_file.writelines(['12', '34', '56'])
+            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:6:six\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_open_flush(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('flush_test') as out_file:
+            out_file.write('flush1')
+            data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
+            with self.mock_keep(data_loc1, 200) as keep_mock:
+                out_file.flush()
+            out_file.write('flush2')
+            data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+        with self.mock_keep(data_loc2, 200) as keep_mock:
+            self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
+                                                                data_loc2),
+                             writer.manifest_text())
+
+    def test_two_opens_same_stream(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('.', '1') as out_file:
+            out_file.write('1st')
+        with writer.open('.', '2') as out_file:
+            out_file.write('2nd')
+        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_two_opens_two_streams(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('file') as out_file:
+            out_file.write('file')
+            data_loc1 = hashlib.md5('file').hexdigest() + '+4'
+        with self.mock_keep(data_loc1, 200) as keep_mock:
+            with writer.open('./dir', 'indir') as out_file:
+                out_file.write('indir')
+                data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        with self.mock_keep(data_loc2, 200) as keep_mock:
+            expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
+                data_loc1, data_loc2)
+            self.assertEqual(expected, writer.manifest_text())
+
+    def test_dup_open_fails(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        file1 = writer.open('one')
+        self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+
 
 if __name__ == '__main__':
     unittest.main()