X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f3b8d03f7063b162355bccfd71aeb2b8b67bbdbb..HEAD:/sdk/python/arvados/commands/arv_copy.py diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py index 7f5245db86..9914cc3ef6 100755 --- a/sdk/python/arvados/commands/arv_copy.py +++ b/sdk/python/arvados/commands/arv_copy.py @@ -13,17 +13,11 @@ # --no-recursive is given, arv-copy copies only the single record # identified by object-uuid. # -# The user must have files $HOME/.config/arvados/{src}.conf and -# $HOME/.config/arvados/{dst}.conf with valid login credentials for -# instances src and dst. If either of these files is not found, +# The user must have configuration files {src}.conf and +# {dst}.conf in a standard configuration directory with valid login credentials +# for instances src and dst. If either of these files is not found, # arv-copy will issue an error. -from __future__ import division -from future import standard_library -from future.utils import listvalues -standard_library.install_aliases() -from past.builtins import basestring -from builtins import object import argparse import contextlib import getpass @@ -39,6 +33,10 @@ import io import json import queue import threading +import errno + +import httplib2.error +import googleapiclient import arvados import arvados.config @@ -46,15 +44,20 @@ import arvados.keep import arvados.util import arvados.commands._util as arv_cmd import arvados.commands.keepdocker -import arvados.http_to_keep -import ruamel.yaml as yaml +from arvados.logging import log_handler +from arvados._internal import basedirs, http_to_keep, s3_to_keep, to_keep_util from arvados._version import __version__ COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') +arvlogger = logging.getLogger('arvados') +keeplogger = logging.getLogger('arvados.keep') logger = logging.getLogger('arvados.arv-copy') +# Set this up so connection errors get logged. +googleapi_logger = logging.getLogger('googleapiclient.http') + # local_repo_dir records which git repositories from the Arvados source # instance have been checked out locally during this run, and to which # directories. @@ -93,10 +96,22 @@ def main(): help='Perform copy even if the object appears to exist at the remote destination.') copy_opts.add_argument( '--src', dest='source_arvados', - help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.') + help=""" +Client configuration location for the source Arvados cluster. +May be either a configuration file path, or a plain identifier like `foo` +to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. +If not provided, will search for a configuration file named after the cluster ID of the source object UUID. +""", + ) copy_opts.add_argument( '--dst', dest='destination_arvados', - help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.') + help=""" +Client configuration location for the destination Arvados cluster. +May be either a configuration file path, or a plain identifier like `foo` +to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. +If not provided, will use the default client configuration from the environment or `settings.conf`. +""", + ) copy_opts.add_argument( '--recursive', dest='recursive', action='store_true', help='Recursively copy any dependencies for this object, and subprojects. (default)') @@ -107,7 +122,18 @@ def main(): '--project-uuid', dest='project_uuid', help='The UUID of the project at the destination to which the collection or workflow should be copied.') copy_opts.add_argument( - '--storage-classes', dest='storage_classes', + '--replication', + type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), + metavar='N', + help=""" +Number of replicas per storage class for the copied collections at the destination. +If not provided (or if provided with invalid value), +use the destination's default replication-level setting (if found), +or the fallback value 2. +""") + copy_opts.add_argument( + '--storage-classes', + type=arv_cmd.UniqueSplit(), help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') copy_opts.add_argument("--varying-url-params", type=str, default="", help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") @@ -126,21 +152,38 @@ def main(): parents=[copy_opts, arv_cmd.retry_opt]) args = parser.parse_args() - if args.storage_classes: - args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x] - if args.verbose: - logger.setLevel(logging.DEBUG) + arvlogger.setLevel(logging.DEBUG) else: - logger.setLevel(logging.INFO) + arvlogger.setLevel(logging.INFO) + keeplogger.setLevel(logging.WARNING) if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): args.source_arvados = args.object_uuid[:5] + if not args.destination_arvados and args.project_uuid: + args.destination_arvados = args.project_uuid[:5] + + # Make sure errors trying to connect to clusters get logged. + googleapi_logger.setLevel(logging.WARN) + googleapi_logger.addHandler(log_handler) + # Create API clients for the source and destination instances src_arv = api_for_instance(args.source_arvados, args.retries) dst_arv = api_for_instance(args.destination_arvados, args.retries) + # Once we've successfully contacted the clusters, we probably + # don't want to see logging about retries (unless the user asked + # for verbose output). + if not args.verbose: + googleapi_logger.setLevel(logging.ERROR) + + if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: + logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) + else: + logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) + logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) + if not args.project_uuid: args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] @@ -159,8 +202,8 @@ def main(): elif t == 'Group': set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) - elif t == 'httpURL': - result = copy_from_http(args.object_uuid, src_arv, dst_arv, args) + elif t == 'httpURL' or t == 's3URL': + result = copy_from_url(args.object_uuid, src_arv, dst_arv, args) else: abort("cannot copy object {} of type {}".format(args.object_uuid, t)) except Exception as e: @@ -168,7 +211,7 @@ def main(): exit(1) # Clean up any outstanding temp git repositories. - for d in listvalues(local_repo_dir): + for d in local_repo_dir.values(): shutil.rmtree(d, ignore_errors=True) if not result: @@ -204,41 +247,68 @@ def set_src_owner_uuid(resource, uuid, args): # (either local or absolute) to a file with Arvados configuration # settings. # -# Otherwise, it is presumed to be the name of a file in -# $HOME/.config/arvados/instance_name.conf +# Otherwise, it is presumed to be the name of a file in a standard +# configuration directory. # def api_for_instance(instance_name, num_retries): - if not instance_name: - # Use environment - return arvados.api('v1') - - if '/' in instance_name: - config_file = instance_name - else: - config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name)) + msg = [] + if instance_name: + if '/' in instance_name: + config_file = instance_name + else: + dirs = basedirs.BaseDirectories('CONFIG') + config_file = next(dirs.search(f'{instance_name}.conf'), '') + try: + cfg = arvados.config.load(config_file) + + if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: + api_is_insecure = ( + cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( + ['1', 't', 'true', 'y', 'yes'])) + return arvados.api('v1', + host=cfg['ARVADOS_API_HOST'], + token=cfg['ARVADOS_API_TOKEN'], + insecure=api_is_insecure, + num_retries=num_retries, + ) + else: + msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) + except OSError as e: + if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): + verb = 'connect to instance from' + elif config_file: + verb = 'open' + else: + verb = 'find' + searchlist = ":".join(str(p) for p in dirs.search_paths()) + config_file = f'{instance_name}.conf in path {searchlist}' + msg.append(("Could not {} config file {}: {}").format( + verb, config_file, e.strerror)) + except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: + msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) + + default_api = None + default_instance = None try: - cfg = arvados.config.load(config_file) - except (IOError, OSError) as e: - abort(("Could not open config file {}: {}\n" + - "You must make sure that your configuration tokens\n" + - "for Arvados instance {} are in {} and that this\n" + - "file is readable.").format( - config_file, e, instance_name, config_file)) - - if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: - api_is_insecure = ( - cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( - ['1', 't', 'true', 'y', 'yes'])) - client = arvados.api('v1', - host=cfg['ARVADOS_API_HOST'], - token=cfg['ARVADOS_API_TOKEN'], - insecure=api_is_insecure, - num_retries=num_retries, - ) - else: - abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name)) - return client + default_api = arvados.api('v1', num_retries=num_retries) + default_instance = default_api.config()["ClusterID"] + except ValueError: + pass + except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: + msg.append("Failed to connect to default instance, error was {}".format(e)) + + if default_api is not None and (not instance_name or instance_name == default_instance): + # Use default settings + return default_api + + if instance_name and default_instance and instance_name != default_instance: + msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) + + for m in msg: + logger.error(m) + + abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN') # Check if git is available def check_git_availability(): @@ -258,10 +328,10 @@ def filter_iter(arg): Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list. """ - if isinstance(arg, basestring): - return iter((arg,)) + if isinstance(arg, str): + yield arg else: - return iter(arg) + yield from arg def migrate_repository_filter(repo_filter, src_repository, dst_repository): """Update a single repository filter in-place for the destination. @@ -331,8 +401,12 @@ def copy_workflow(wf_uuid, src, dst, args): "ARVADOS_API_TOKEN": src.api_token, "PATH": os.environ["PATH"]} try: - result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], - capture_output=True, env=env) + result = subprocess.run( + ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], + env=env, + stdout=subprocess.PIPE, + universal_newlines=True, + ) except FileNotFoundError: no_arv_copy = True else: @@ -409,7 +483,7 @@ def copy_collections(obj, src, dst, args): collections_copied[src_id] = dst_col['uuid'] return collections_copied[src_id] - if isinstance(obj, basestring): + if isinstance(obj, str): # Copy any collections identified in this string to dst, replacing # them with the dst uuids as necessary. obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) @@ -572,6 +646,14 @@ def copy_collection(obj_uuid, src, dst, args): ).execute(num_retries=args.retries)['manifest_text'] return create_collection_from(c, src, dst, args) + if args.replication is None: + # Obtain default or fallback collection replication setting on the + # destination + try: + args.replication = int(dst.config()["Collections"]["DefaultReplication"]) + except (KeyError, TypeError, ValueError): + args.replication = 2 + # Fetch the collection's manifest. manifest = c['manifest_text'] logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) @@ -631,7 +713,7 @@ def copy_collection(obj_uuid, src, dst, args): logger.debug("Getting block %s", word) data = src_keep.get(word) put_queue.put((word, data)) - except e: + except Exception as e: logger.error("Error getting block %s: %s", word, e) transfer_error.append(e) try: @@ -663,13 +745,13 @@ def copy_collection(obj_uuid, src, dst, args): try: logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) - dst_locator = dst_keep.put(data, classes=(args.storage_classes or [])) + dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) with lock: dst_locators[blockhash] = dst_locator bytes_written += loc.size if progress_writer: progress_writer.report(obj_uuid, bytes_written, bytes_expected) - except e: + except Exception as e: logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) try: # Drain the 'get' queue so we end early @@ -736,58 +818,6 @@ def copy_collection(obj_uuid, src, dst, args): c['manifest_text'] = dst_manifest.getvalue() return create_collection_from(c, src, dst, args) -def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt): - r = api.repositories().list( - filters=[['name', '=', repo_name]]).execute(num_retries=retries) - if r['items_available'] != 1: - raise Exception('cannot identify repo {}; {} repos found' - .format(repo_name, r['items_available'])) - - https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")] - http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")] - other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")] - - priority = https_url + other_url + http_url - - for url in priority: - if url.startswith("http"): - u = urllib.parse.urlsplit(url) - baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", "")) - git_config = ["-c", "credential.%s/.username=none" % baseurl, - "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl] - else: - git_config = [] - - try: - logger.debug("trying %s", url) - subprocess.run( - ['git', *git_config, 'ls-remote', url], - check=True, - env={ - 'ARVADOS_API_TOKEN': api.api_token, - 'GIT_ASKPASS': '/bin/false', - 'HOME': os.environ['HOME'], - }, - stdout=subprocess.DEVNULL, - ) - except subprocess.CalledProcessError: - pass - else: - git_url = url - break - else: - raise Exception('Cannot access git repository, tried {}' - .format(priority)) - - if git_url.startswith("http:"): - if allow_insecure_http: - logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt) - else: - raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt)) - - return (git_url, git_config) - - def copy_docker_image(docker_image, docker_image_tag, src, dst, args): """Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate @@ -894,6 +924,9 @@ def uuid_type(api, object_uuid): if object_uuid.startswith("http:") or object_uuid.startswith("https:"): return 'httpURL' + if object_uuid.startswith("s3:"): + return 's3URL' + p = object_uuid.split('-') if len(p) == 3: type_prefix = p[1] @@ -904,21 +937,34 @@ def uuid_type(api, object_uuid): return None -def copy_from_http(url, src, dst, args): +def copy_from_url(url, src, dst, args): project_uuid = args.project_uuid - varying_url_params = args.varying_url_params + # Ensure string of varying parameters is well-formed prefer_cached_downloads = args.prefer_cached_downloads - cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {}, - varying_url_params=varying_url_params, - prefer_cached_downloads=prefer_cached_downloads) + cached = to_keep_util.CheckCacheResult(None, None, None, None, None) + + if url.startswith("http:") or url.startswith("https:"): + cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, + varying_url_params=args.varying_url_params, + prefer_cached_downloads=prefer_cached_downloads) + elif url.startswith("s3:"): + import boto3.session + botosession = boto3.session.Session() + cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {}, + prefer_cached_downloads=prefer_cached_downloads) + if cached[2] is not None: return copy_collection(cached[2], src, dst, args) - cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url, - varying_url_params=varying_url_params, - prefer_cached_downloads=prefer_cached_downloads) + if url.startswith("http:") or url.startswith("https:"): + cached = http_to_keep.http_to_keep(dst, project_uuid, url, + varying_url_params=args.varying_url_params, + prefer_cached_downloads=prefer_cached_downloads) + elif url.startswith("s3:"): + cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url, + prefer_cached_downloads=prefer_cached_downloads) if cached is not None: return {"uuid": cached[2]}