# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
import arvados
-import bz2
import copy
import hashlib
import mock
import os
import pprint
import re
-import subprocess
import tempfile
import unittest
[2, '.', 'ob.txt', 'ob'],
[0, '.', 'zero.txt', '']])
- def _test_readline(self, what_in, what_out):
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.txt')
- cw.write(what_in)
- test1 = cw.finish()
- cr = arvados.CollectionReader(test1, self.api_client)
- got = []
- for x in list(cr.all_files())[0].readlines():
- got += [x]
- self.assertEqual(got,
- what_out,
- "readlines did not split lines correctly: %s" % got)
-
- def test_collection_readline(self):
- self._test_readline("\na\nbcd\n\nefg\nz",
- ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
- self._test_readline("ab\ncd\n",
- ["ab\n", "cd\n"])
-
def test_collection_empty_file(self):
cw = arvados.CollectionWriter(self.api_client)
cw.start_new_file('zero.txt')
got_sizes += [f.size()]
self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
- def test_collection_bz2_decompression(self):
- n_lines_in = 2**18
- data_in = "abc\n"
- for x in xrange(0, 18):
- data_in += data_in
- compressed_data_in = bz2.compress(data_in)
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.bz2')
- cw.write(compressed_data_in)
- bz2_manifest = cw.manifest_text()
-
- cr = arvados.CollectionReader(bz2_manifest, self.api_client)
-
- got = 0
- for x in list(cr.all_files())[0].readlines():
- self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
- got += 1
- self.assertEqual(got,
- n_lines_in,
- "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
- def test_collection_gzip_decompression(self):
- n_lines_in = 2**18
- data_in = "abc\n"
- for x in xrange(0, 18):
- data_in += data_in
- p = subprocess.Popen(["gzip", "-1cn"],
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=False, close_fds=True)
- compressed_data_in, stderrdata = p.communicate(data_in)
-
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.gz')
- cw.write(compressed_data_in)
- gzip_manifest = cw.manifest_text()
-
- cr = arvados.CollectionReader(gzip_manifest, self.api_client)
- got = 0
- for x in list(cr.all_files())[0].readlines():
- self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
- got += 1
- self.assertEqual(got,
- n_lines_in,
- "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
['c', 5, 0, 1]])
- class MockStreamReader(object):
- def __init__(self, content):
- self.content = content
- self.num_retries = 0
-
- def readfrom(self, start, size, num_retries=0):
- return self.content[start:start+size]
-
- def test_file_stream(self):
- content = 'abcdefghijklmnopqrstuvwxyz0123456789'
- msr = self.MockStreamReader(content)
- segments = [[0, 10, 0],
- [10, 15, 10],
- [25, 5, 25]]
-
- sfr = arvados.StreamFileReader(msr, segments, "test")
-
- self.assertEqual(sfr.name(), "test")
- self.assertEqual(sfr.size(), 30)
-
- self.assertEqual(sfr.readfrom(0, 30), content[0:30])
- self.assertEqual(sfr.readfrom(2, 30), content[2:30])
-
- self.assertEqual(sfr.readfrom(2, 8), content[2:10])
- self.assertEqual(sfr.readfrom(0, 10), content[0:10])
-
- self.assertEqual(sfr.tell(), 0)
- self.assertEqual(sfr.read(5), content[0:5])
- self.assertEqual(sfr.tell(), 5)
- self.assertEqual(sfr.read(5), content[5:10])
- self.assertEqual(sfr.tell(), 10)
- self.assertEqual(sfr.read(5), content[10:15])
- self.assertEqual(sfr.tell(), 15)
- self.assertEqual(sfr.read(5), content[15:20])
- self.assertEqual(sfr.tell(), 20)
- self.assertEqual(sfr.read(5), content[20:25])
- self.assertEqual(sfr.tell(), 25)
- self.assertEqual(sfr.read(5), content[25:30])
- self.assertEqual(sfr.tell(), 30)
- self.assertEqual(sfr.read(5), '')
- self.assertEqual(sfr.tell(), 30)
-
- segments = [[26, 10, 0],
- [0, 15, 10],
- [15, 5, 25]]
-
- sfr = arvados.StreamFileReader(msr, segments, "test")
-
- self.assertEqual(sfr.size(), 30)
-
- self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
- self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
-
- self.assertEqual(sfr.readfrom(2, 8), content[28:36])
- self.assertEqual(sfr.readfrom(0, 10), content[26:36])
-
- self.assertEqual(sfr.tell(), 0)
- self.assertEqual(sfr.read(5), content[26:31])
- self.assertEqual(sfr.tell(), 5)
- self.assertEqual(sfr.read(5), content[31:36])
- self.assertEqual(sfr.tell(), 10)
- self.assertEqual(sfr.read(5), content[0:5])
- self.assertEqual(sfr.tell(), 15)
- self.assertEqual(sfr.read(5), content[5:10])
- self.assertEqual(sfr.tell(), 20)
- self.assertEqual(sfr.read(5), content[10:15])
- self.assertEqual(sfr.tell(), 25)
- self.assertEqual(sfr.read(5), content[15:20])
- self.assertEqual(sfr.tell(), 30)
- self.assertEqual(sfr.read(5), '')
- self.assertEqual(sfr.tell(), 30)
-
-
class MockKeep(object):
def __init__(self, content, num_retries=0):
self.content = content
self.assertEqual(sr.readfrom(25, 5), content[25:30])
self.assertEqual(sr.readfrom(30, 5), '')
- def test_file_reader(self):
- keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
- 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
- 'cccccccccccccccccccccccccccccccc+5': 'z0123'}
- mk = self.MockKeep(keepblocks)
-
- sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
-
- content = 'abcdefghijpqrstuvwxy'
-
- f = sr.files()["foo"]
-
- # f.read() calls will be aligned on block boundaries (as a
- # result of ticket #3663).
-
- f.seek(0)
- self.assertEqual(f.read(20), content[0:10])
-
- f.seek(0)
- self.assertEqual(f.read(6), content[0:6])
- self.assertEqual(f.read(6), content[6:10])
- self.assertEqual(f.read(6), content[10:16])
- self.assertEqual(f.read(6), content[16:20])
-
def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
def test_locator_init(self):
client = self.api_client_mock(200)
# Ensure Keep will not return anything if asked.
- with tutil.mock_responses(None, 404):
+ with tutil.mock_get_responses(None, 404):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
# been written to Keep.
client = self.api_client_mock(200)
self.mock_get_collection(client, 404, None)
- with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
api_client=client)
self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
client = self.api_client_mock(404)
reader = arvados.CollectionReader(self.DEFAULT_UUID,
api_client=client)
- with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+ with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
with self.assertRaises(arvados.errors.ApiError):
reader.manifest_text()
# To verify that CollectionReader tries Keep first here, we
# mock API server to return the wrong data.
client = self.api_client_mock(200)
- with tutil.mock_responses(self.ALT_MANIFEST, 200):
+ with tutil.mock_get_responses(self.ALT_MANIFEST, 200):
self.assertEqual(
self.ALT_MANIFEST,
arvados.CollectionReader(
client = self.api_client_mock(200)
reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
num_retries=3)
- with tutil.mock_responses('foo', 500, 500, 200):
+ with tutil.mock_get_responses('foo', 500, 500, 200):
self.assertEqual('foo',
''.join(f.read(9) for f in reader.all_files()))
[[f.size(), f.stream_name(), f.name()]
for f in reader.all_streams()[0].all_files()])
+ def test_read_empty_collection(self):
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 200, 'empty')
+ reader = arvados.CollectionReader('d41d8cd98f00b204e9800998ecf8427e+0',
+ api_client=client)
+ self.assertEqual('', reader.manifest_text())
+
+ def test_api_response(self):
+ client = self.api_client_mock()
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ self.assertEqual(self.DEFAULT_COLLECTION, reader.api_response())
+
+ def test_api_response_with_collection_from_keep(self):
+ client = self.api_client_mock()
+ self.mock_get_collection(client, 404, 'foo')
+ with tutil.mock_get_responses(self.DEFAULT_MANIFEST, 200):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client)
+ api_response = reader.api_response()
+ self.assertIsNone(api_response)
+
+ def check_open_file(self, coll_file, stream_name, file_name, file_size):
+ self.assertFalse(coll_file.closed, "returned file is not open")
+ self.assertEqual(stream_name, coll_file.stream_name())
+ self.assertEqual(file_name, coll_file.name())
+ self.assertEqual(file_size, coll_file.size())
+
+ def test_open_collection_file_one_argument(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ cfile = reader.open('./foo')
+ self.check_open_file(cfile, '.', 'foo', 3)
+
+ def test_open_collection_file_two_arguments(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ cfile = reader.open('.', 'foo')
+ self.check_open_file(cfile, '.', 'foo', 3)
+
+ def test_open_deep_file(self):
+ coll_name = 'collection_with_files_in_subdir'
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 200, coll_name)
+ reader = arvados.CollectionReader(
+ self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
+ cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+ self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
+ 32)
+
+ def test_open_nonexistent_stream(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ self.assertRaises(ValueError, reader.open, './nonexistent', 'foo')
+
+ def test_open_nonexistent_file(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ self.assertRaises(ValueError, reader.open, '.', 'nonexistent')
+
@tutil.skip_sleep
class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
def mock_keep(self, body, *codes, **headers):
headers.setdefault('x-keep-replicas-stored', 2)
- return tutil.mock_responses(body, *codes, **headers)
+ return tutil.mock_put_responses(body, *codes, **headers)
def foo_writer(self, **kwargs):
api_client = self.api_client_mock()
writer.flush_data()
self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+ def test_one_open(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('out') as out_file:
+ self.assertEqual('.', writer.current_stream_name())
+ self.assertEqual('out', writer.current_file_name())
+ out_file.write('test data')
+ data_loc = hashlib.md5('test data').hexdigest() + '+9'
+ self.assertTrue(out_file.closed, "writer file not closed after context")
+ self.assertRaises(ValueError, out_file.write, 'extra text')
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:9:out\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_open_writelines(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('six') as out_file:
+ out_file.writelines(['12', '34', '56'])
+ data_loc = hashlib.md5('123456').hexdigest() + '+6'
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:6:six\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_open_flush(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('flush_test') as out_file:
+ out_file.write('flush1')
+ data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
+ with self.mock_keep(data_loc1, 200) as keep_mock:
+ out_file.flush()
+ out_file.write('flush2')
+ data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+ with self.mock_keep(data_loc2, 200) as keep_mock:
+ self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
+ data_loc2),
+ writer.manifest_text())
+
+ def test_two_opens_same_stream(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('.', '1') as out_file:
+ out_file.write('1st')
+ with writer.open('.', '2') as out_file:
+ out_file.write('2nd')
+ data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_two_opens_two_streams(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('file') as out_file:
+ out_file.write('file')
+ data_loc1 = hashlib.md5('file').hexdigest() + '+4'
+ with self.mock_keep(data_loc1, 200) as keep_mock:
+ with writer.open('./dir', 'indir') as out_file:
+ out_file.write('indir')
+ data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+ with self.mock_keep(data_loc2, 200) as keep_mock:
+ expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
+ data_loc1, data_loc2)
+ self.assertEqual(expected, writer.manifest_text())
+
+ def test_dup_open_fails(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ file1 = writer.open('one')
+ self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+
if __name__ == '__main__':
unittest.main()