+ def test_short_put_from_stdin(self):
+ # Have to run this as an integration test since arv-put can't
+ # read from the tests' stdin.
+ # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
+ # case, because the /proc entry is already gone by the time it tries.
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__, '--stream'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, env=self.ENVIRON)
+ pipe.stdin.write('stdin test\n')
+ pipe.stdin.close()
+ deadline = time.time() + 5
+ while (pipe.poll() is None) and (time.time() < deadline):
+ time.sleep(.1)
+ returncode = pipe.poll()
+ if returncode is None:
+ pipe.terminate()
+ self.fail("arv-put did not PUT from stdin within 5 seconds")
+ elif returncode != 0:
+ sys.stdout.write(pipe.stdout.read())
+ self.fail("arv-put returned exit code {}".format(returncode))
+ self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
+