Merge branch '21666-provision-test-improvement'
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
index 79dabd38b2d847072aa7d8eccb119c0224469244..eccf488efb4dca5bfadb5613336d55035238c7fa 100755 (executable)
 # instances src and dst.  If either of these files is not found,
 # arv-copy will issue an error.
 
-from __future__ import division
-from future import standard_library
-from future.utils import listvalues
-standard_library.install_aliases()
-from past.builtins import basestring
-from builtins import object
 import argparse
 import contextlib
 import getpass
 import os
 import re
 import shutil
+import subprocess
 import sys
 import logging
 import tempfile
 import urllib.parse
 import io
+import json
+import queue
+import threading
 
 import arvados
 import arvados.config
@@ -42,9 +40,8 @@ import arvados.keep
 import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
-import ruamel.yaml as yaml
+import arvados.http_to_keep
 
-from arvados.api import OrderedJsonModel
 from arvados._version import __version__
 
 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
@@ -89,10 +86,10 @@ def main():
         help='Perform copy even if the object appears to exist at the remote destination.')
     copy_opts.add_argument(
         '--src', dest='source_arvados',
-        help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+        help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
     copy_opts.add_argument(
         '--dst', dest='destination_arvados',
-        help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+        help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
     copy_opts.add_argument(
         '--recursive', dest='recursive', action='store_true',
         help='Recursively copy any dependencies for this object, and subprojects. (default)')
@@ -105,6 +102,11 @@ def main():
     copy_opts.add_argument(
         '--storage-classes', dest='storage_classes',
         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+    copy_opts.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 
     copy_opts.add_argument(
         'object_uuid',
@@ -113,7 +115,7 @@ def main():
     copy_opts.set_defaults(recursive=True)
 
     parser = argparse.ArgumentParser(
-        description='Copy a workflow or collection from one Arvados instance to another.',
+        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
         parents=[copy_opts, arv_cmd.retry_opt])
     args = parser.parse_args()
 
@@ -125,43 +127,59 @@ def main():
     else:
         logger.setLevel(logging.INFO)
 
-    if not args.source_arvados:
+    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
         args.source_arvados = args.object_uuid[:5]
 
     # Create API clients for the source and destination instances
-    src_arv = api_for_instance(args.source_arvados)
-    dst_arv = api_for_instance(args.destination_arvados)
+    src_arv = api_for_instance(args.source_arvados, args.retries)
+    dst_arv = api_for_instance(args.destination_arvados, args.retries)
 
     if not args.project_uuid:
         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
 
     # Identify the kind of object we have been given, and begin copying.
     t = uuid_type(src_arv, args.object_uuid)
-    if t == 'Collection':
-        set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
-        result = copy_collection(args.object_uuid,
-                                 src_arv, dst_arv,
-                                 args)
-    elif t == 'Workflow':
-        set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
-        result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
-    elif t == 'Group':
-        set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
-        result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
-    else:
-        abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+    try:
+        if t == 'Collection':
+            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+            result = copy_collection(args.object_uuid,
+                                     src_arv, dst_arv,
+                                     args)
+        elif t == 'Workflow':
+            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+        elif t == 'Group':
+            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+        elif t == 'httpURL':
+            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+        else:
+            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+    except Exception as e:
+        logger.error("%s", e, exc_info=args.verbose)
+        exit(1)
 
     # Clean up any outstanding temp git repositories.
-    for d in listvalues(local_repo_dir):
+    for d in local_repo_dir.values():
         shutil.rmtree(d, ignore_errors=True)
 
+    if not result:
+        exit(1)
+
     # If no exception was thrown and the response does not have an
     # error_token field, presume success
-    if 'error_token' in result or 'uuid' not in result:
-        logger.error("API server returned an error result: {}".format(result))
+    if result is None or 'error_token' in result or 'uuid' not in result:
+        if result:
+            logger.error("API server returned an error result: {}".format(result))
+        exit(1)
+
+    print(result['uuid'])
+
+    if result.get('partial_error'):
+        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
         exit(1)
 
-    logger.info("")
     logger.info("Success: created copy with uuid {}".format(result['uuid']))
     exit(0)
 
@@ -182,10 +200,10 @@ def set_src_owner_uuid(resource, uuid, args):
 #     Otherwise, it is presumed to be the name of a file in
 #     $HOME/.config/arvados/instance_name.conf
 #
-def api_for_instance(instance_name):
+def api_for_instance(instance_name, num_retries):
     if not instance_name:
         # Use environment
-        return arvados.api('v1', model=OrderedJsonModel())
+        return arvados.api('v1')
 
     if '/' in instance_name:
         config_file = instance_name
@@ -209,7 +227,8 @@ def api_for_instance(instance_name):
                              host=cfg['ARVADOS_API_HOST'],
                              token=cfg['ARVADOS_API_TOKEN'],
                              insecure=api_is_insecure,
-                             model=OrderedJsonModel())
+                             num_retries=num_retries,
+                             )
     else:
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
@@ -217,8 +236,12 @@ def api_for_instance(instance_name):
 # Check if git is available
 def check_git_availability():
     try:
-        arvados.util.run_command(['git', '--help'])
-    except Exception:
+        subprocess.run(
+            ['git', '--version'],
+            check=True,
+            stdout=subprocess.DEVNULL,
+        )
+    except FileNotFoundError:
         abort('git command is not available. Please ensure git is installed.')
 
 
@@ -228,10 +251,10 @@ def filter_iter(arg):
     Pass in a filter field that can either be a string or list.
     This will iterate elements as if the field had been written as a list.
     """
-    if isinstance(arg, basestring):
-        return iter((arg,))
+    if isinstance(arg, str):
+        yield arg
     else:
-        return iter(arg)
+        yield from arg
 
 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
     """Update a single repository filter in-place for the destination.
@@ -292,23 +315,31 @@ def copy_workflow(wf_uuid, src, dst, args):
     # fetch the workflow from the source instance
     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
 
+    if not wf["definition"]:
+        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
+
     # copy collections and docker images
-    if args.recursive:
-        wf_def = yaml.safe_load(wf["definition"])
-        if wf_def is not None:
-            locations = []
-            docker_images = {}
-            graph = wf_def.get('$graph', None)
-            if graph is not None:
-                workflow_collections(graph, locations, docker_images)
-            else:
-                workflow_collections(wf_def, locations, docker_images)
+    if args.recursive and wf["definition"]:
+        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+               "ARVADOS_API_TOKEN": src.api_token,
+               "PATH": os.environ["PATH"]}
+        try:
+            result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+                                    capture_output=True, env=env)
+        except FileNotFoundError:
+            no_arv_copy = True
+        else:
+            no_arv_copy = result.returncode == 2
+
+        if no_arv_copy:
+            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
+        elif result.returncode != 0:
+            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
 
-            if locations:
-                copy_collections(locations, src, dst, args)
+        locations = json.loads(result.stdout)
 
-            for image in docker_images:
-                copy_docker_image(image, docker_images[image], src, dst, args)
+        if locations:
+            copy_collections(locations, src, dst, args)
 
     # copy the workflow itself
     del wf['uuid']
@@ -371,7 +402,7 @@ def copy_collections(obj, src, dst, args):
                 collections_copied[src_id] = dst_col['uuid']
         return collections_copied[src_id]
 
-    if isinstance(obj, basestring):
+    if isinstance(obj, str):
         # Copy any collections identified in this string to dst, replacing
         # them with the dst uuids as necessary.
         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
@@ -552,6 +583,125 @@ def copy_collection(obj_uuid, src, dst, args):
     else:
         progress_writer = None
 
+    # go through the words
+    # put each block loc into 'get' queue
+    # 'get' threads get block and put it into 'put' queue
+    # 'put' threads put block and then update dst_locators
+    #
+    # after going through the whole manifest we go back through it
+    # again and build dst_manifest
+
+    lock = threading.Lock()
+
+    # the get queue should be unbounded because we'll add all the
+    # block hashes we want to get, but these are small
+    get_queue = queue.Queue()
+
+    threadcount = 4
+
+    # the put queue contains full data blocks
+    # and if 'get' is faster than 'put' we could end up consuming
+    # a great deal of RAM if it isn't bounded.
+    put_queue = queue.Queue(threadcount)
+    transfer_error = []
+
+    def get_thread():
+        while True:
+            word = get_queue.get()
+            if word is None:
+                put_queue.put(None)
+                get_queue.task_done()
+                return
+
+            blockhash = arvados.KeepLocator(word).md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    get_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Getting block %s", word)
+                data = src_keep.get(word)
+                put_queue.put((word, data))
+            except e:
+                logger.error("Error getting block %s: %s", word, e)
+                transfer_error.append(e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+            finally:
+                get_queue.task_done()
+
+    def put_thread():
+        nonlocal bytes_written
+        while True:
+            item = put_queue.get()
+            if item is None:
+                put_queue.task_done()
+                return
+
+            word, data = item
+            loc = arvados.KeepLocator(word)
+            blockhash = loc.md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    put_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+                with lock:
+                    dst_locators[blockhash] = dst_locator
+                    bytes_written += loc.size
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+            except e:
+                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+                transfer_error.append(e)
+            finally:
+                put_queue.task_done()
+
+    for line in manifest.splitlines():
+        words = line.split()
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                # If 'word' can't be parsed as a locator,
+                # presume it's a filename.
+                continue
+
+            get_queue.put(word)
+
+    for i in range(0, threadcount):
+        get_queue.put(None)
+
+    for i in range(0, threadcount):
+        threading.Thread(target=get_thread, daemon=True).start()
+
+    for i in range(0, threadcount):
+        threading.Thread(target=put_thread, daemon=True).start()
+
+    get_queue.join()
+    put_queue.join()
+
+    if len(transfer_error) > 0:
+        return {"error_token": "Failed to transfer blocks"}
+
     for line in manifest.splitlines():
         words = line.split()
         dst_manifest.write(words[0])
@@ -565,16 +715,6 @@ def copy_collection(obj_uuid, src, dst, args):
                 dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
-            # copy this block if we haven't seen it before
-            # (otherwise, just reuse the existing dst_locator)
-            if blockhash not in dst_locators:
-                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                data = src_keep.get(word)
-                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-                dst_locators[blockhash] = dst_locator
-                bytes_written += loc.size
             dst_manifest.write(' ')
             dst_manifest.write(dst_locators[blockhash])
         dst_manifest.write("\n")
@@ -589,55 +729,6 @@ def copy_collection(obj_uuid, src, dst, args):
     c['manifest_text'] = dst_manifest.getvalue()
     return create_collection_from(c, src, dst, args)
 
-def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
-    r = api.repositories().list(
-        filters=[['name', '=', repo_name]]).execute(num_retries=retries)
-    if r['items_available'] != 1:
-        raise Exception('cannot identify repo {}; {} repos found'
-                        .format(repo_name, r['items_available']))
-
-    https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
-    http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
-    other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
-
-    priority = https_url + other_url + http_url
-
-    git_config = []
-    git_url = None
-    for url in priority:
-        if url.startswith("http"):
-            u = urllib.parse.urlsplit(url)
-            baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
-            git_config = ["-c", "credential.%s/.username=none" % baseurl,
-                          "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
-        else:
-            git_config = []
-
-        try:
-            logger.debug("trying %s", url)
-            arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
-                                      env={"HOME": os.environ["HOME"],
-                                           "ARVADOS_API_TOKEN": api.api_token,
-                                           "GIT_ASKPASS": "/bin/false"})
-        except arvados.errors.CommandFailedError:
-            pass
-        else:
-            git_url = url
-            break
-
-    if not git_url:
-        raise Exception('Cannot access git repository, tried {}'
-                        .format(priority))
-
-    if git_url.startswith("http:"):
-        if allow_insecure_http:
-            logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
-        else:
-            raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
-
-    return (git_url, git_config)
-
-
 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
     """Copy the docker image identified by docker_image and
     docker_image_tag from src to dst. Create appropriate
@@ -683,17 +774,31 @@ def copy_project(obj_uuid, src, dst, owner_uuid, args):
 
     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
 
+
+    partial_error = ""
+
     # Copy collections
-    copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
-                     src, dst, args)
+    try:
+        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+                         src, dst, args)
+    except Exception as e:
+        partial_error += "\n" + str(e)
 
     # Copy workflows
-    for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
-        copy_workflow(w["uuid"], src, dst, args)
+    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+        try:
+            copy_workflow(w["uuid"], src, dst, args)
+        except Exception as e:
+            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
 
     if args.recursive:
-        for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
-            copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+            try:
+                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+            except Exception as e:
+                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
+
+    project_record["partial_error"] = partial_error
 
     return project_record
 
@@ -704,9 +809,14 @@ def copy_project(obj_uuid, src, dst, owner_uuid, args):
 #    repository)
 #
 def git_rev_parse(rev, repo):
-    gitout, giterr = arvados.util.run_command(
-        ['git', 'rev-parse', rev], cwd=repo)
-    return gitout.strip()
+    proc = subprocess.run(
+        ['git', 'rev-parse', rev],
+        check=True,
+        cwd=repo,
+        stdout=subprocess.PIPE,
+        text=True,
+    )
+    return proc.stdout.read().strip()
 
 # uuid_type(api, object_uuid)
 #
@@ -721,6 +831,10 @@ def git_rev_parse(rev, repo):
 def uuid_type(api, object_uuid):
     if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
+
+    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+        return 'httpURL'
+
     p = object_uuid.split('-')
     if len(p) == 3:
         type_prefix = p[1]
@@ -730,6 +844,27 @@ def uuid_type(api, object_uuid):
                 return k
     return None
 
+
+def copy_from_http(url, src, dst, args):
+
+    project_uuid = args.project_uuid
+    varying_url_params = args.varying_url_params
+    prefer_cached_downloads = args.prefer_cached_downloads
+
+    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+                                                   varying_url_params=varying_url_params,
+                                                   prefer_cached_downloads=prefer_cached_downloads)
+    if cached[2] is not None:
+        return copy_collection(cached[2], src, dst, args)
+
+    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+                                               varying_url_params=varying_url_params,
+                                               prefer_cached_downloads=prefer_cached_downloads)
+
+    if cached is not None:
+        return {"uuid": cached[2]}
+
+
 def abort(msg, code=1):
     logger.info("arv-copy: %s", msg)
     exit(code)