+ def test_resume_fails_with_expired_locator(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ state = cwriter.last_state()
+ # Get the last locator, remove any permission hint, and add
+ # an expired one.
+ new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
+ state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
+ new_loc, 'a' * 40)
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
+