import arvados
import bz2
import copy
-import errno
import os
import pprint
-import shutil
import subprocess
-import sys
import tempfile
import unittest
+from arvados_testutil import ArvadosKeepLocalStoreTestCase
+
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
return self.saved_states[-1]
-class ArvadosCollectionsTest(unittest.TestCase):
- def _make_tmpdir(self):
- self._tempdirs.append(tempfile.mkdtemp())
- return self._tempdirs[-1]
-
- def setUp(self):
- self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
- self._tempdirs = []
- os.environ['KEEP_LOCAL_STORE'] = self._make_tmpdir()
-
- def tearDown(self):
- for workdir in self._tempdirs:
- shutil.rmtree(workdir, ignore_errors=True)
- if self._orig_keep_local_store is None:
- del os.environ['KEEP_LOCAL_STORE']
- else:
- os.environ['KEEP_LOCAL_STORE'] = self._orig_keep_local_store
-
+class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
def write_foo_bar_baz(self):
cw = arvados.CollectionWriter()
self.assertEqual(cw.current_stream_name(), '.',
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
- def data_file(self, filename):
- try:
- basedir = os.path.dirname(__file__)
- except NameError:
- basedir = '.'
- return open(os.path.join(basedir, 'data', filename))
-
def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
self.assertEqual(arvados.CollectionReader(m1).all_streams()[0].files()['md9sum.txt'].as_manifest(),
". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n")
- def build_directory_tree(self, tree=['basefile', 'subdir/subfile']):
- tree_root = self._make_tmpdir()
- for leaf in tree:
- path = os.path.join(tree_root, leaf)
- try:
- os.makedirs(os.path.dirname(path))
- except OSError as error:
- if error.errno != errno.EEXIST:
- raise
- with open(path, 'w') as tmpfile:
- tmpfile.write(leaf)
- return tree_root
-
def test_write_directory_tree(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree())
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']))
self.assertEqual(cwriter.manifest_text(),
""". c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
./subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
def test_write_named_directory_tree(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree(), 'root')
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), 'root')
self.assertEqual(
cwriter.manifest_text(),
"""./root c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
def test_write_directory_tree_in_one_stream(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree(),
- max_manifest_depth=0)
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), max_manifest_depth=0)
self.assertEqual(cwriter.manifest_text(),
""". 4ace875ffdc6824a04950f06858f4465+22 0:8:basefile
./subdir 4ace875ffdc6824a04950f06858f4465+22 8:14:subfile\n""")
./d1 50170217e5b04312024aa5cd42934494+13 8:5:f2
./d1/d2 50170217e5b04312024aa5cd42934494+13 0:8:f3\n""")
- def make_test_file(self, text="test"):
- testfile = tempfile.NamedTemporaryFile()
- testfile.write(text)
- testfile.flush()
- return testfile
-
def test_write_one_file(self):
cwriter = arvados.CollectionWriter()
with self.make_test_file() as testfile:
TestResumableWriter.from_state,
cwriter.last_state())
+ def test_resume_fails_with_expired_locator(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ state = cwriter.last_state()
+ # Get the last locator, remove any permission hint, and add
+ # an expired one.
+ new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
+ state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
+ new_loc, 'a' * 40)
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
+
def test_successful_resumes(self):
# FIXME: This is more of an integration test than a unit test.
cwriter = TestResumableWriter()
- source_tree = self.build_directory_tree()
+ source_tree = self.build_directory_tree(['basefile', 'subdir/subfile'])
with open(os.path.join(source_tree, 'long'), 'w') as longfile:
longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
cwriter.write_directory_tree(source_tree)