def exit_signal_handler(sigcode, frame):
sys.exit(-sigcode)
-def main(arguments=None):
+def main(arguments=None, output_to=sys.stdout):
args = parse_arguments(arguments)
if args.progress:
except (IOError, OSError):
pass # Couldn't open cache directory/file. Continue without it.
except ResumeCacheConflict:
- print "arv-put: Another process is already uploading this data."
+ output_to.write(
+ "arv-put: Another process is already uploading this data.\n")
sys.exit(1)
if resume_cache is None:
print >>sys.stderr
if args.stream:
- print writer.manifest_text(),
+ output = writer.manifest_text()
elif args.raw:
- print ','.join(writer.data_locators())
+ output = ','.join(writer.data_locators())
else:
# Register the resulting collection in Arvados.
collection = arvados.api().collections().create(
).execute()
# Print the locator (uuid) of the new collection.
- print collection['uuid']
+ output = collection['uuid']
+
+ output_to.write(output)
+ if not output.endswith('\n'):
+ output_to.write('\n')
for sigcode, orig_handler in orig_signal_handlers.items():
signal.signal(sigcode, orig_handler)
if resume_cache is not None:
resume_cache.destroy()
+ return output
+
if __name__ == '__main__':
main()
import unittest
import yaml
+from cStringIO import StringIO
+
import arvados
import arvados.commands.put as arv_put
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def call_main_on_test_file(self):
+ self.main_output = StringIO()
with self.make_test_file() as testfile:
path = testfile.name
- arv_put.main(['--stream', '--no-progress', path])
+ arv_put.main(['--stream', '--no-progress', path], self.main_output)
self.assertTrue(
os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
'098f6bcd4621d373cade4e832627b4f6')),