# TODO:
# --md5sum - display md5 of each file as read from disk
+import apiclient.errors
import argparse
import arvados
import base64
+import datetime
import errno
import fcntl
import hashlib
import json
import os
+import pwd
import signal
+import socket
import sys
import tempfile
stream per filesystem directory that contains files.
""")
+upload_opts.add_argument('--project-uuid', metavar='UUID', help="""
+When a Collection is made, make a Link to save it under the specified project.
+""")
+
+upload_opts.add_argument('--name', help="""
+When a Collection is linked to a project, use the specified name.
+""")
+
_group = upload_opts.add_mutually_exclusive_group()
_group.add_argument('--as-stream', action='store_true', dest='stream',
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
- @classmethod
- def setup_user_cache(cls):
- return arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700)
-
def __init__(self, file_spec):
self.cache_file = open(file_spec, 'a+')
self._lock_file(self.cache_file)
md5.update(str(max(args.max_manifest_depth, -1)))
elif args.filename:
md5.update(args.filename)
- return os.path.join(cls.CACHE_DIR, md5.hexdigest())
+ return os.path.join(
+ arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
+ md5.hexdigest())
def _lock_file(self, fileobj):
try:
def exit_signal_handler(sigcode, frame):
sys.exit(-sigcode)
-def main(arguments=None, output_to=sys.stdout):
+def check_project_exists(project_uuid):
+ try:
+ arvados.api('v1').groups().get(uuid=project_uuid).execute()
+ except (apiclient.errors.Error, arvados.errors.NotFoundError) as error:
+ raise ValueError("Project {} not found ({})".format(project_uuid,
+ error))
+ else:
+ return True
+
+def prep_project_link(args, stderr, project_exists=check_project_exists):
+ # Given the user's command line arguments, return a dictionary with data
+ # to create the desired project link for this Collection, or None.
+ # Raises ValueError if the arguments request something impossible.
+ making_collection = not (args.raw or args.stream)
+ any_link_spec = args.project_uuid or args.name
+ if not making_collection:
+ if any_link_spec:
+ raise ValueError("Requested a Link without creating a Collection")
+ return None
+ elif not any_link_spec:
+ stderr.write(
+ "arv-put: No --project-uuid or --name specified. This data will be cached\n"
+ "in Keep. You will need to find this upload by its locator(s) later.\n")
+ return None
+ elif not args.project_uuid:
+ raise ValueError("--name requires --project-uuid")
+ elif not project_exists(args.project_uuid):
+ raise ValueError("Project {} not found".format(args.project_uuid))
+ link = {'tail_uuid': args.project_uuid, 'link_class': 'name'}
+ if args.name:
+ link['name'] = args.name
+ return link
+
+def create_project_link(locator, link):
+ link['head_uuid'] = locator
+ link.setdefault('name', "Collection saved by {}@{} at {}".format(
+ pwd.getpwuid(os.getuid()).pw_name,
+ socket.gethostname(),
+ datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")))
+ return arvados.api('v1').links().create(body=link).execute()
+
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+ status = 0
+
args = parse_arguments(arguments)
+ try:
+ project_link = prep_project_link(args, stderr)
+ except ValueError as error:
+ print >>stderr, "arv-put: {}.".format(error)
+ sys.exit(2)
if args.progress:
reporter = progress_writer(human_progress)
resume_cache = None
try:
- if ResumeCache.setup_user_cache() is not None:
- resume_cache = ResumeCache(ResumeCache.make_path(args))
- except (IOError, OSError):
+ resume_cache = ResumeCache(ResumeCache.make_path(args))
+ except (IOError, OSError, ValueError):
pass # Couldn't open cache directory/file. Continue without it.
except ResumeCacheConflict:
- output_to.write(
+ stdout.write(
"arv-put: Another process is already uploading this data.\n")
sys.exit(1)
for sigcode in CAUGHT_SIGNALS}
if writer.bytes_written > 0: # We're resuming a previous upload.
- print >>sys.stderr, "\n".join([
+ print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
- writer.report_progress()
+ writer.report_progress()
writer.do_queued_work() # Do work resumed from cache.
for path in args.paths: # Copy file data to Keep.
if os.path.isdir(path):
writer.finish_current_stream()
if args.progress: # Print newline to split stderr from stdout for humans.
- print >>sys.stderr
+ print >>stderr
if args.stream:
output = writer.manifest_text()
# Print the locator (uuid) of the new collection.
output = collection['uuid']
-
- output_to.write(output)
+ if project_link is not None:
+ try:
+ create_project_link(output, project_link)
+ except apiclient.errors.Error as error:
+ print >>stderr, (
+ "arv-put: Error adding Collection to project: {}.".format(
+ error))
+ status = 1
+
+ stdout.write(output)
if not output.endswith('\n'):
- output_to.write('\n')
+ stdout.write('\n')
for sigcode, orig_handler in orig_signal_handlers.items():
signal.signal(sigcode, orig_handler)
+ if status != 0:
+ sys.exit(status)
+
if resume_cache is not None:
resume_cache.destroy()
import apiclient
import os
+import pwd
import re
import shutil
import subprocess
arv_put.human_progress(count, None)))
+class ArvadosPutProjectLinkTest(ArvadosBaseTestCase):
+ Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+
+ def setUp(self):
+ self.stderr = StringIO()
+ super(ArvadosPutProjectLinkTest, self).setUp()
+
+ def tearDown(self):
+ self.stderr.close()
+ super(ArvadosPutProjectLinkTest, self).tearDown()
+
+ def prep_link_from_arguments(self, args, uuid_found=True):
+ try:
+ link = arv_put.prep_project_link(arv_put.parse_arguments(args),
+ self.stderr,
+ lambda uuid: uuid_found)
+ finally:
+ self.stderr.seek(0)
+ return link
+
+ def check_link(self, link, project_uuid, link_name=None):
+ self.assertEqual(project_uuid, link.get('tail_uuid'))
+ self.assertEqual('name', link.get('link_class'))
+ if link_name is None:
+ self.assertNotIn('name', link)
+ else:
+ self.assertEqual(link_name, link.get('name'))
+ self.assertNotIn('head_uuid', link)
+
+ def check_stderr_empty(self):
+ self.assertEqual('', self.stderr.getvalue())
+
+ def test_project_link_with_name(self):
+ link = self.prep_link_from_arguments(['--project-uuid', self.Z_UUID,
+ '--name', 'test link AAA'])
+ self.check_link(link, self.Z_UUID, 'test link AAA')
+ self.check_stderr_empty()
+
+ def test_project_link_without_name(self):
+ link = self.prep_link_from_arguments(['--project-uuid', self.Z_UUID])
+ self.check_link(link, self.Z_UUID)
+ self.check_stderr_empty()
+
+ def test_collection_without_project_warned(self):
+ self.assertIsNone(self.prep_link_from_arguments([]))
+ for line in self.stderr:
+ if "--project-uuid or --name" in line:
+ break
+ else:
+ self.fail("no warning emitted about the lack of project name")
+
+ def test_no_link_or_warning_with_no_collection(self):
+ self.assertIsNone(self.prep_link_from_arguments(['--raw']))
+ self.check_stderr_empty()
+
+ def test_error_when_project_not_found(self):
+ self.assertRaises(ValueError,
+ self.prep_link_from_arguments,
+ ['--project-uuid', self.Z_UUID], False)
+
+ def test_name_without_project_is_error(self):
+ self.assertRaises(ValueError,
+ self.prep_link_from_arguments,
+ ['--name', 'test'])
+
+ def test_link_without_collection_is_error(self):
+ self.assertRaises(ValueError,
+ self.prep_link_from_arguments,
+ ['--project-uuid', self.Z_UUID, '--stream'])
+
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
+ def call_main_with_args(self, args):
+ self.main_stdout = StringIO()
+ self.main_stderr = StringIO()
+ return arv_put.main(args, self.main_stdout, self.main_stderr)
+
def call_main_on_test_file(self):
- self.main_output = StringIO()
with self.make_test_file() as testfile:
path = testfile.name
- arv_put.main(['--stream', '--no-progress', path], self.main_output)
+ self.call_main_with_args(['--stream', '--no-progress', path])
self.assertTrue(
os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
'098f6bcd4621d373cade4e832627b4f6')),
"did not find file stream in Keep store")
+ def tearDown(self):
+ for outbuf in ['main_stdout', 'main_stderr']:
+ if hasattr(self, outbuf):
+ getattr(self, outbuf).close()
+ delattr(self, outbuf)
+ super(ArvadosPutTest, self).tearDown()
+
def test_simple_file_put(self):
self.call_main_on_test_file()
self.fail("arv-put returned exit code {}".format(returncode))
self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
+ def test_link_without_project_uuid_aborts(self):
+ self.assertRaises(SystemExit, self.call_main_with_args,
+ ['--name', 'test without project UUID', '/dev/null'])
+
+ def test_link_without_collection_aborts(self):
+ self.assertRaises(SystemExit, self.call_main_with_args,
+ ['--name', 'test without Collection',
+ '--stream', '/dev/null'])
class ArvPutIntegrationTest(unittest.TestCase):
+ PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
+
@classmethod
def setUpClass(cls):
try:
run_test_server.stop()
run_test_server.stop_keep()
- def test_ArvPutSignedManifest(self):
- # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
- # the newly created manifest from the API server, testing to confirm
- # that the block locators in the returned manifest are signed.
- run_test_server.authorize_with('active')
+ def authorize_with(self, token_name):
+ run_test_server.authorize_with(token_name)
for v in ["ARVADOS_API_HOST",
"ARVADOS_API_HOST_INSECURE",
"ARVADOS_API_TOKEN"]:
os.environ[v] = arvados.config.settings()[v]
+ def test_check_real_project_found(self):
+ self.assertTrue(arv_put.check_project_exists(self.PROJECT_UUID),
+ "did not correctly find test fixture project")
+
+ def test_check_error_finding_nonexistent_project(self):
+ BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+ try:
+ result = arv_put.check_project_exists(BAD_UUID)
+ except ValueError as error:
+ self.assertIn(BAD_UUID, error.message)
+ else:
+ self.assertFalse(result, "incorrectly found nonexistent project")
+
+ def test_ArvPutSignedManifest(self):
+ # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
+ # the newly created manifest from the API server, testing to confirm
+ # that the block locators in the returned manifest are signed.
+ self.authorize_with('active')
+
# Before doing anything, demonstrate that the collection
# we're about to create is not present in our test fixture.
api = arvados.api('v1', cache=False)
os.remove(os.path.join(datadir, "foo"))
os.rmdir(datadir)
+ def run_and_find_link(self, text, extra_args=[]):
+ self.authorize_with('active')
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__,
+ '--project-uuid', self.PROJECT_UUID] + extra_args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = pipe.communicate(text)
+ link_list = arvados.api('v1', cache=False).links().list(
+ filters=[['head_uuid', '=', stdout.strip()],
+ ['tail_uuid', '=', self.PROJECT_UUID],
+ ['link_class', '=', 'name']]).execute().get('items', [])
+ self.assertEqual(1, len(link_list))
+ return link_list[0]
+
+ def test_put_collection_with_unnamed_project_link(self):
+ link = self.run_and_find_link("Test unnamed collection")
+ username = pwd.getpwuid(os.getuid()).pw_name
+ self.assertRegexpMatches(
+ link['name'],
+ r'^Collection saved by {}@'.format(re.escape(username)))
+
+ def test_put_collection_with_named_project_link(self):
+ link_name = 'Test auto Collection Link'
+ link = self.run_and_find_link("Test named collection",
+ ['--name', link_name])
+ self.assertEqual(link_name, link['name'])
+
if __name__ == '__main__':
unittest.main()