class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
- def __init__(self):
- self.saved_states = []
- return super(TestResumableWriter, self).__init__()
-
- def checkpoint_state(self):
- self.saved_states.append(self.dump_state(copy.deepcopy))
-
- def last_state(self):
- assert self.saved_states, "resumable writer did not save any state"
- return self.saved_states[-1]
+ def current_state(self):
+ return self.dump_state(copy.deepcopy)
class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
cw.start_new_file('zero.txt')
cw.write('')
- self.assertEqual(cw.manifest_text(), ". 0:0:zero.txt\n")
+ self.assertEqual(cw.manifest_text(), ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:zero.txt\n")
self.check_manifest_file_sizes(cw.manifest_text(), [0])
cw = arvados.CollectionWriter()
cw.start_new_file('zero.txt')
self.assertEqual(sr.readfrom(25, 5), content[25:30])
self.assertEqual(sr.readfrom(30, 5), '')
+ def test_file_reader(self):
+ keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
+ 'cccccccccccccccccccccccccccccccc+5': 'z0123'}
+ mk = self.MockKeep(keepblocks)
+
+ sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
+
+ content = 'abcdefghijpqrstuvwxy'
+
+ f = sr.files()["foo"]
+
+ f.seek(0)
+ self.assertEqual(f.read(20), content[0:20])
+
+ f.seek(0)
+ self.assertEqual(f.read(6), content[0:6])
+ self.assertEqual(f.read(6), content[6:12])
+ self.assertEqual(f.read(6), content[12:18])
+
def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
cwriter.manifest_text(),
". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
- def test_checkpoint_after_put(self):
- cwriter = TestResumableWriter()
- with self.make_test_file(
- 't' * (cwriter.KEEP_BLOCK_SIZE + 10)) as testfile:
- testpath = os.path.realpath(testfile.name)
- cwriter.write_file(testpath, 'test')
- for state in cwriter.saved_states:
- if state.get('_current_file') == (testpath,
- cwriter.KEEP_BLOCK_SIZE):
- break
- else:
- self.fail("can't find state immediately after PUT to Keep")
- self.assertIn('d45107e93f9052fa88a82fc08bb1d316+1024', # 't' * 1024
- state['_current_stream_locators'])
-
def test_basic_resume(self):
cwriter = TestResumableWriter()
with self.make_test_file() as testfile:
cwriter.write_file(testfile.name, 'test')
- last_state = cwriter.last_state()
- resumed = TestResumableWriter.from_state(last_state)
+ resumed = TestResumableWriter.from_state(cwriter.current_state())
self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
"resumed CollectionWriter had different manifest")
cwriter.write_file(testfile.name, 'test')
self.assertRaises(arvados.errors.StaleWriterStateError,
TestResumableWriter.from_state,
- cwriter.last_state())
+ cwriter.current_state())
def test_resume_fails_when_dependency_mtime_changed(self):
cwriter = TestResumableWriter()
os.utime(testfile.name, (0, 0))
self.assertRaises(arvados.errors.StaleWriterStateError,
TestResumableWriter.from_state,
- cwriter.last_state())
+ cwriter.current_state())
def test_resume_fails_when_dependency_is_nonfile(self):
cwriter = TestResumableWriter()
cwriter.write_file('/dev/null', 'empty')
self.assertRaises(arvados.errors.StaleWriterStateError,
TestResumableWriter.from_state,
- cwriter.last_state())
+ cwriter.current_state())
def test_resume_fails_when_dependency_size_changed(self):
cwriter = TestResumableWriter()
os.utime(testfile.name, (orig_mtime, orig_mtime))
self.assertRaises(arvados.errors.StaleWriterStateError,
TestResumableWriter.from_state,
- cwriter.last_state())
+ cwriter.current_state())
def test_resume_fails_with_expired_locator(self):
cwriter = TestResumableWriter()
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- state = cwriter.last_state()
- # Get the last locator, remove any permission hint, and add
- # an expired one.
- new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
- state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
- new_loc, 'a' * 40)
- self.assertRaises(arvados.errors.StaleWriterStateError,
- TestResumableWriter.from_state, state)
-
- def test_successful_resumes(self):
- # FIXME: This is more of an integration test than a unit test.
- cwriter = TestResumableWriter()
- source_tree = self.build_directory_tree(['basefile', 'subdir/subfile'])
- with open(os.path.join(source_tree, 'long'), 'w') as longfile:
- longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
- cwriter.write_directory_tree(source_tree)
- # A state for each file, plus a fourth for mid-longfile.
- self.assertGreater(len(cwriter.saved_states), 3,
- "CollectionWriter didn't save enough states to test")
-
- for state in cwriter.saved_states:
- new_writer = TestResumableWriter.from_state(state)
- manifests = [writer.manifest_text()
- for writer in (cwriter, new_writer)]
- self.assertEquals(
- manifests[0], manifests[1],
- "\n".join(["manifest mismatch after resuming from state:",
- pprint.pformat(state), ""] + manifests))
+ state = cwriter.current_state()
+ # Add an expired locator to the state.
+ state['_current_stream_locators'].append(''.join([
+ 'a' * 32, '+A', 'b' * 40, '@', '10000000']))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
def test_arbitrary_objects_not_resumable(self):
cwriter = TestResumableWriter()