attr_value = attr_class(attr_value)
setattr(writer, attr_name, attr_value)
# Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
writer.check_dependencies()
if state['_current_file'] is not None:
path, pos = state['_current_file']
TestResumableWriter.from_state,
cwriter.last_state())
+ def test_resume_fails_with_expired_locator(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ state = cwriter.last_state()
+ # Get the last locator, remove any permission hint, and add
+ # an expired one.
+ new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
+ state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
+ new_loc, 'a' * 40)
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
+
def test_successful_resumes(self):
# FIXME: This is more of an integration test than a unit test.
cwriter = TestResumableWriter()