20933: Use [0-9] instead of \d in regex
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
index 8850d0bfd5a82c10633a3b39d0d5958961ead940..6c7d873af4a0d7741123502c3444112ecafcf395 100755 (executable)
@@ -2,14 +2,14 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-# arv-copy [--recursive] [--no-recursive] object-uuid src dst
+# arv-copy [--recursive] [--no-recursive] object-uuid
 #
 # Copies an object from Arvados instance src to instance dst.
 #
 # By default, arv-copy recursively copies any dependent objects
 # necessary to make the object functional in the new instance
-# (e.g. for a pipeline instance, arv-copy copies the pipeline
-# template, input collection, docker images, git repositories). If
+# (e.g. for a workflow, arv-copy copies the workflow,
+# input collections, and docker images). If
 # --no-recursive is given, arv-copy copies only the single record
 # identified by object-uuid.
 #
@@ -30,10 +30,15 @@ import getpass
 import os
 import re
 import shutil
+import subprocess
 import sys
 import logging
 import tempfile
 import urllib.parse
+import io
+import json
+import queue
+import threading
 
 import arvados
 import arvados.config
@@ -41,9 +46,9 @@ import arvados.keep
 import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
+import arvados.http_to_keep
 import ruamel.yaml as yaml
 
-from arvados.api import OrderedJsonModel
 from arvados._version import __version__
 
 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
@@ -87,32 +92,28 @@ def main():
         '-f', '--force', dest='force', action='store_true',
         help='Perform copy even if the object appears to exist at the remote destination.')
     copy_opts.add_argument(
-        '--force-filters', action='store_true', default=False,
-        help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
+        '--src', dest='source_arvados',
+        help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
     copy_opts.add_argument(
-        '--src', dest='source_arvados', required=True,
-        help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
-    copy_opts.add_argument(
-        '--dst', dest='destination_arvados', required=True,
-        help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+        '--dst', dest='destination_arvados',
+        help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
     copy_opts.add_argument(
         '--recursive', dest='recursive', action='store_true',
-        help='Recursively copy any dependencies for this object. (default)')
+        help='Recursively copy any dependencies for this object, and subprojects. (default)')
     copy_opts.add_argument(
         '--no-recursive', dest='recursive', action='store_false',
-        help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
-    copy_opts.add_argument(
-        '--dst-git-repo', dest='dst_git_repo',
-        help='The name of the destination git repository. Required when copying a pipeline recursively.')
+        help='Do not copy any dependencies or subprojects.')
     copy_opts.add_argument(
         '--project-uuid', dest='project_uuid',
-        help='The UUID of the project at the destination to which the pipeline should be copied.')
+        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
     copy_opts.add_argument(
-        '--allow-git-http-src', action="store_true",
-        help='Allow cloning git repositories over insecure http')
-    copy_opts.add_argument(
-        '--allow-git-http-dst', action="store_true",
-        help='Allow pushing git repositories over insecure http')
+        '--storage-classes', dest='storage_classes',
+        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+    copy_opts.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 
     copy_opts.add_argument(
         'object_uuid',
@@ -121,55 +122,71 @@ def main():
     copy_opts.set_defaults(recursive=True)
 
     parser = argparse.ArgumentParser(
-        description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
+        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
         parents=[copy_opts, arv_cmd.retry_opt])
     args = parser.parse_args()
 
+    if args.storage_classes:
+        args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
+
     if args.verbose:
         logger.setLevel(logging.DEBUG)
     else:
         logger.setLevel(logging.INFO)
 
+    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
+        args.source_arvados = args.object_uuid[:5]
+
     # Create API clients for the source and destination instances
-    src_arv = api_for_instance(args.source_arvados)
-    dst_arv = api_for_instance(args.destination_arvados)
+    src_arv = api_for_instance(args.source_arvados, args.retries)
+    dst_arv = api_for_instance(args.destination_arvados, args.retries)
 
     if not args.project_uuid:
         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
 
     # Identify the kind of object we have been given, and begin copying.
     t = uuid_type(src_arv, args.object_uuid)
-    if t == 'Collection':
-        set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
-        result = copy_collection(args.object_uuid,
-                                 src_arv, dst_arv,
-                                 args)
-    elif t == 'PipelineInstance':
-        set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
-        result = copy_pipeline_instance(args.object_uuid,
-                                        src_arv, dst_arv,
-                                        args)
-    elif t == 'PipelineTemplate':
-        set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
-        result = copy_pipeline_template(args.object_uuid,
-                                        src_arv, dst_arv, args)
-    elif t == 'Workflow':
-        set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
-        result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
-    else:
-        abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+    try:
+        if t == 'Collection':
+            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+            result = copy_collection(args.object_uuid,
+                                     src_arv, dst_arv,
+                                     args)
+        elif t == 'Workflow':
+            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+        elif t == 'Group':
+            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+        elif t == 'httpURL':
+            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+        else:
+            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+    except Exception as e:
+        logger.error("%s", e, exc_info=args.verbose)
+        exit(1)
 
     # Clean up any outstanding temp git repositories.
     for d in listvalues(local_repo_dir):
         shutil.rmtree(d, ignore_errors=True)
 
+    if not result:
+        exit(1)
+
     # If no exception was thrown and the response does not have an
     # error_token field, presume success
-    if 'error_token' in result or 'uuid' not in result:
-        logger.error("API server returned an error result: {}".format(result))
+    if result is None or 'error_token' in result or 'uuid' not in result:
+        if result:
+            logger.error("API server returned an error result: {}".format(result))
+        exit(1)
+
+    print(result['uuid'])
+
+    if result.get('partial_error'):
+        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
         exit(1)
 
-    logger.info("")
     logger.info("Success: created copy with uuid {}".format(result['uuid']))
     exit(0)
 
@@ -190,7 +207,11 @@ def set_src_owner_uuid(resource, uuid, args):
 #     Otherwise, it is presumed to be the name of a file in
 #     $HOME/.config/arvados/instance_name.conf
 #
-def api_for_instance(instance_name):
+def api_for_instance(instance_name, num_retries):
+    if not instance_name:
+        # Use environment
+        return arvados.api('v1')
+
     if '/' in instance_name:
         config_file = instance_name
     else:
@@ -213,7 +234,8 @@ def api_for_instance(instance_name):
                              host=cfg['ARVADOS_API_HOST'],
                              token=cfg['ARVADOS_API_TOKEN'],
                              insecure=api_is_insecure,
-                             model=OrderedJsonModel())
+                             num_retries=num_retries,
+                             )
     else:
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
@@ -221,71 +243,14 @@ def api_for_instance(instance_name):
 # Check if git is available
 def check_git_availability():
     try:
-        arvados.util.run_command(['git', '--help'])
-    except Exception:
+        subprocess.run(
+            ['git', '--version'],
+            check=True,
+            stdout=subprocess.DEVNULL,
+        )
+    except FileNotFoundError:
         abort('git command is not available. Please ensure git is installed.')
 
-# copy_pipeline_instance(pi_uuid, src, dst, args)
-#
-#    Copies a pipeline instance identified by pi_uuid from src to dst.
-#
-#    If the args.recursive option is set:
-#      1. Copies all input collections
-#           * For each component in the pipeline, include all collections
-#             listed as job dependencies for that component)
-#      2. Copy docker images
-#      3. Copy git repositories
-#      4. Copy the pipeline template
-#
-#    The only changes made to the copied pipeline instance are:
-#      1. The original pipeline instance UUID is preserved in
-#         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
-#      2. The pipeline_template_uuid is changed to the new template uuid.
-#      3. The owner_uuid of the instance is changed to the user who
-#         copied it.
-#
-def copy_pipeline_instance(pi_uuid, src, dst, args):
-    # Fetch the pipeline instance record.
-    pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
-
-    if args.recursive:
-        check_git_availability()
-
-        if not args.dst_git_repo:
-            abort('--dst-git-repo is required when copying a pipeline recursively.')
-        # Copy the pipeline template and save the copied template.
-        if pi.get('pipeline_template_uuid', None):
-            pt = copy_pipeline_template(pi['pipeline_template_uuid'],
-                                        src, dst, args)
-
-        # Copy input collections, docker images and git repos.
-        pi = copy_collections(pi, src, dst, args)
-        copy_git_repos(pi, src, dst, args.dst_git_repo, args)
-        copy_docker_images(pi, src, dst, args)
-
-        # Update the fields of the pipeline instance with the copied
-        # pipeline template.
-        if pi.get('pipeline_template_uuid', None):
-            pi['pipeline_template_uuid'] = pt['uuid']
-
-    else:
-        # not recursive
-        logger.info("Copying only pipeline instance %s.", pi_uuid)
-        logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
-
-    # Update the pipeline instance properties, and create the new
-    # instance at dst.
-    pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
-    pi['description'] = "Pipeline copied from {}\n\n{}".format(
-        pi_uuid,
-        pi['description'] if pi.get('description', None) else '')
-
-    pi['owner_uuid'] = args.project_uuid
-
-    del pi['uuid']
-
-    new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
-    return new_pi
 
 def filter_iter(arg):
     """Iterate a filter string-or-list.
@@ -340,82 +305,6 @@ def exception_handler(handler, *exc_types):
     except exc_types as error:
         handler(error)
 
-def migrate_components_filters(template_components, dst_git_repo):
-    """Update template component filters in-place for the destination.
-
-    template_components is a dictionary of components in a pipeline template.
-    This method walks over each component's filters, and updates them to have
-    identical semantics on the destination cluster.  It returns a list of
-    error strings that describe what filters could not be updated safely.
-
-    dst_git_repo is the name of the destination Git repository, which can
-    be None if that is not known.
-    """
-    errors = []
-    for cname, cspec in template_components.items():
-        def add_error(errmsg):
-            errors.append("{}: {}".format(cname, errmsg))
-        if not isinstance(cspec, dict):
-            add_error("value is not a component definition")
-            continue
-        src_repository = cspec.get('repository')
-        filters = cspec.get('filters', [])
-        if not isinstance(filters, list):
-            add_error("filters are not a list")
-            continue
-        for cfilter in filters:
-            if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
-                add_error("malformed filter {!r}".format(cfilter))
-                continue
-            if attr_filtered(cfilter, 'repository'):
-                with exception_handler(add_error, ValueError):
-                    migrate_repository_filter(cfilter, src_repository, dst_git_repo)
-            if attr_filtered(cfilter, 'script_version'):
-                with exception_handler(add_error, ValueError):
-                    migrate_script_version_filter(cfilter)
-    return errors
-
-# copy_pipeline_template(pt_uuid, src, dst, args)
-#
-#    Copies a pipeline template identified by pt_uuid from src to dst.
-#
-#    If args.recursive is True, also copy any collections, docker
-#    images and git repositories that this template references.
-#
-#    The owner_uuid of the new template is changed to that of the user
-#    who copied the template.
-#
-#    Returns the copied pipeline template object.
-#
-def copy_pipeline_template(pt_uuid, src, dst, args):
-    # fetch the pipeline template from the source instance
-    pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
-
-    if not args.force_filters:
-        filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
-        if filter_errors:
-            abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
-                  "\n".join(filter_errors))
-
-    if args.recursive:
-        check_git_availability()
-
-        if not args.dst_git_repo:
-            abort('--dst-git-repo is required when copying a pipeline recursively.')
-        # Copy input collections, docker images and git repos.
-        pt = copy_collections(pt, src, dst, args)
-        copy_git_repos(pt, src, dst, args.dst_git_repo, args)
-        copy_docker_images(pt, src, dst, args)
-
-    pt['description'] = "Pipeline template copied from {}\n\n{}".format(
-        pt_uuid,
-        pt['description'] if pt.get('description', None) else '')
-    pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
-    del pt['uuid']
-
-    pt['owner_uuid'] = args.project_uuid
-
-    return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
 
 # copy_workflow(wf_uuid, src, dst, args)
 #
@@ -433,28 +322,37 @@ def copy_workflow(wf_uuid, src, dst, args):
     # fetch the workflow from the source instance
     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
 
+    if not wf["definition"]:
+        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
+
     # copy collections and docker images
-    if args.recursive:
-        wf_def = yaml.safe_load(wf["definition"])
-        if wf_def is not None:
-            locations = []
-            docker_images = {}
-            graph = wf_def.get('$graph', None)
-            if graph is not None:
-                workflow_collections(graph, locations, docker_images)
-            else:
-                workflow_collections(wf_def, locations, docker_images)
+    if args.recursive and wf["definition"]:
+        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+               "ARVADOS_API_TOKEN": src.api_token,
+               "PATH": os.environ["PATH"]}
+        try:
+            result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+                                    capture_output=True, env=env)
+        except (FileNotFoundError, subprocess.CalledProcessError):
+            logger.error('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
+            return
 
-            if locations:
-                copy_collections(locations, src, dst, args)
+        locations = json.loads(result.stdout)
 
-            for image in docker_images:
-                copy_docker_image(image, docker_images[image], src, dst, args)
+        if locations:
+            copy_collections(locations, src, dst, args)
 
     # copy the workflow itself
     del wf['uuid']
     wf['owner_uuid'] = args.project_uuid
-    return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+
+    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
+                                             ["name", "=", wf["name"]]]).execute()
+    if len(existing["items"]) == 0:
+        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+    else:
+        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
+
 
 def workflow_collections(obj, locations, docker_images):
     if isinstance(obj, dict):
@@ -463,7 +361,7 @@ def workflow_collections(obj, locations, docker_images):
             if loc.startswith("keep:"):
                 locations.append(loc[5:])
 
-        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
+        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
         if docker_image is not None:
             ds = docker_image.split(":", 1)
             tag = ds[1] if len(ds)==2 else 'latest'
@@ -518,53 +416,6 @@ def copy_collections(obj, src, dst, args):
         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
     return obj
 
-def migrate_jobspec(jobspec, src, dst, dst_repo, args):
-    """Copy a job's script to the destination repository, and update its record.
-
-    Given a jobspec dictionary, this function finds the referenced script from
-    src and copies it to dst and dst_repo.  It also updates jobspec in place to
-    refer to names on the destination.
-    """
-    repo = jobspec.get('repository')
-    if repo is None:
-        return
-    # script_version is the "script_version" parameter from the source
-    # component or job.  If no script_version was supplied in the
-    # component or job, it is a mistake in the pipeline, but for the
-    # purposes of copying the repository, default to "master".
-    script_version = jobspec.get('script_version') or 'master'
-    script_key = (repo, script_version)
-    if script_key not in scripts_copied:
-        copy_git_repo(repo, src, dst, dst_repo, script_version, args)
-        scripts_copied.add(script_key)
-    jobspec['repository'] = dst_repo
-    repo_dir = local_repo_dir[repo]
-    for version_key in ['script_version', 'supplied_script_version']:
-        if version_key in jobspec:
-            jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
-
-# copy_git_repos(p, src, dst, dst_repo, args)
-#
-#    Copies all git repositories referenced by pipeline instance or
-#    template 'p' from src to dst.
-#
-#    For each component c in the pipeline:
-#      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
-#      * Rename script versions:
-#          * c['script_version']
-#          * c['job']['script_version']
-#          * c['job']['supplied_script_version']
-#        to the commit hashes they resolve to, since any symbolic
-#        names (tags, branches) are not preserved in the destination repo.
-#
-#    The pipeline object is updated in place with the new repository
-#    names.  The return value is undefined.
-#
-def copy_git_repos(p, src, dst, dst_repo, args):
-    for component in p['components'].values():
-        migrate_jobspec(component, src, dst, dst_repo, args)
-        if 'job' in component:
-            migrate_jobspec(component['job'], src, dst, dst_repo, args)
 
 def total_collection_size(manifest_text):
     """Return the total number of bytes in this collection (excluding
@@ -590,17 +441,19 @@ def create_collection_from(c, src, dst, args):
     available."""
 
     collection_uuid = c['uuid']
-    del c['uuid']
+    body = {}
+    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
+        body[d] = c[d]
 
-    if not c["name"]:
-        c['name'] = "copied from " + collection_uuid
+    if not body["name"]:
+        body['name'] = "copied from " + collection_uuid
 
-    if 'properties' in c:
-        del c['properties']
+    if args.storage_classes:
+        body['storage_classes_desired'] = args.storage_classes
 
-    c['owner_uuid'] = args.project_uuid
+    body['owner_uuid'] = args.project_uuid
 
-    dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
+    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
 
     # Create docker_image_repo+tag and docker_image_hash links
     # at the destination.
@@ -642,10 +495,11 @@ def create_collection_from(c, src, dst, args):
 #
 def copy_collection(obj_uuid, src, dst, args):
     if arvados.util.keep_locator_pattern.match(obj_uuid):
-        # If the obj_uuid is a portable data hash, it might not be uniquely
-        # identified with a particular collection.  As a result, it is
-        # ambigious as to what name to use for the copy.  Apply some heuristics
-        # to pick which collection to get the name from.
+        # If the obj_uuid is a portable data hash, it might not be
+        # uniquely identified with a particular collection.  As a
+        # result, it is ambiguous as to what name to use for the copy.
+        # Apply some heuristics to pick which collection to get the
+        # name from.
         srccol = src.collections().list(
             filters=[['portable_data_hash', '=', obj_uuid]],
             order="created_at asc"
@@ -664,7 +518,7 @@ def copy_collection(obj_uuid, src, dst, args):
             c = items[0]
         if not c:
             # See if there is a collection that's in the same project
-            # as the root item (usually a pipeline) being copied.
+            # as the root item (usually a workflow) being copied.
             for i in items:
                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
                     c = i
@@ -721,7 +575,7 @@ def copy_collection(obj_uuid, src, dst, args):
     # a new manifest as we go.
     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
-    dst_manifest = ""
+    dst_manifest = io.StringIO()
     dst_locators = {}
     bytes_written = 0
     bytes_expected = total_collection_size(manifest)
@@ -730,39 +584,150 @@ def copy_collection(obj_uuid, src, dst, args):
     else:
         progress_writer = None
 
+    # go through the words
+    # put each block loc into 'get' queue
+    # 'get' threads get block and put it into 'put' queue
+    # 'put' threads put block and then update dst_locators
+    #
+    # after going through the whole manifest we go back through it
+    # again and build dst_manifest
+
+    lock = threading.Lock()
+
+    # the get queue should be unbounded because we'll add all the
+    # block hashes we want to get, but these are small
+    get_queue = queue.Queue()
+
+    threadcount = 4
+
+    # the put queue contains full data blocks
+    # and if 'get' is faster than 'put' we could end up consuming
+    # a great deal of RAM if it isn't bounded.
+    put_queue = queue.Queue(threadcount)
+    transfer_error = []
+
+    def get_thread():
+        while True:
+            word = get_queue.get()
+            if word is None:
+                put_queue.put(None)
+                get_queue.task_done()
+                return
+
+            blockhash = arvados.KeepLocator(word).md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    get_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Getting block %s", word)
+                data = src_keep.get(word)
+                put_queue.put((word, data))
+            except e:
+                logger.error("Error getting block %s: %s", word, e)
+                transfer_error.append(e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+            finally:
+                get_queue.task_done()
+
+    def put_thread():
+        nonlocal bytes_written
+        while True:
+            item = put_queue.get()
+            if item is None:
+                put_queue.task_done()
+                return
+
+            word, data = item
+            loc = arvados.KeepLocator(word)
+            blockhash = loc.md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    put_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+                with lock:
+                    dst_locators[blockhash] = dst_locator
+                    bytes_written += loc.size
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+            except e:
+                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+                transfer_error.append(e)
+            finally:
+                put_queue.task_done()
+
     for line in manifest.splitlines():
         words = line.split()
-        dst_manifest += words[0]
         for word in words[1:]:
             try:
                 loc = arvados.KeepLocator(word)
             except ValueError:
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
-                dst_manifest += ' ' + word
+                continue
+
+            get_queue.put(word)
+
+    for i in range(0, threadcount):
+        get_queue.put(None)
+
+    for i in range(0, threadcount):
+        threading.Thread(target=get_thread, daemon=True).start()
+
+    for i in range(0, threadcount):
+        threading.Thread(target=put_thread, daemon=True).start()
+
+    get_queue.join()
+    put_queue.join()
+
+    if len(transfer_error) > 0:
+        return {"error_token": "Failed to transfer blocks"}
+
+    for line in manifest.splitlines():
+        words = line.split()
+        dst_manifest.write(words[0])
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                # If 'word' can't be parsed as a locator,
+                # presume it's a filename.
+                dst_manifest.write(' ')
+                dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
-            # copy this block if we haven't seen it before
-            # (otherwise, just reuse the existing dst_locator)
-            if blockhash not in dst_locators:
-                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                data = src_keep.get(word)
-                dst_locator = dst_keep.put(data)
-                dst_locators[blockhash] = dst_locator
-                bytes_written += loc.size
-            dst_manifest += ' ' + dst_locators[blockhash]
-        dst_manifest += "\n"
+            dst_manifest.write(' ')
+            dst_manifest.write(dst_locators[blockhash])
+        dst_manifest.write("\n")
 
     if progress_writer:
         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
         progress_writer.finish()
 
     # Copy the manifest and save the collection.
-    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
+    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
 
-    c['manifest_text'] = dst_manifest
+    c['manifest_text'] = dst_manifest.getvalue()
     return create_collection_from(c, src, dst, args)
 
 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
@@ -778,8 +743,6 @@ def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_
 
     priority = https_url + other_url + http_url
 
-    git_config = []
-    git_url = None
     for url in priority:
         if url.startswith("http"):
             u = urllib.parse.urlsplit(url)
@@ -791,17 +754,22 @@ def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_
 
         try:
             logger.debug("trying %s", url)
-            arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
-                                      env={"HOME": os.environ["HOME"],
-                                           "ARVADOS_API_TOKEN": api.api_token,
-                                           "GIT_ASKPASS": "/bin/false"})
-        except arvados.errors.CommandFailedError:
+            subprocess.run(
+                ['git', *git_config, 'ls-remote', url],
+                check=True,
+                env={
+                    'ARVADOS_API_TOKEN': api.api_token,
+                    'GIT_ASKPASS': '/bin/false',
+                    'HOME': os.environ['HOME'],
+                },
+                stdout=subprocess.DEVNULL,
+            )
+        except subprocess.CalledProcessError:
             pass
         else:
             git_url = url
             break
-
-    if not git_url:
+    else:
         raise Exception('Cannot access git repository, tried {}'
                         .format(priority))
 
@@ -814,68 +782,6 @@ def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_
     return (git_url, git_config)
 
 
-# copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
-#
-#    Copies commits from git repository 'src_git_repo' on Arvados
-#    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
-#    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
-#    or "jsmith")
-#
-#    All commits will be copied to a destination branch named for the
-#    source repository URL.
-#
-#    The destination repository must already exist.
-#
-#    The user running this command must be authenticated
-#    to both repositories.
-#
-def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
-    # Identify the fetch and push URLs for the git repositories.
-
-    (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
-    (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
-
-    logger.debug('src_git_url: {}'.format(src_git_url))
-    logger.debug('dst_git_url: {}'.format(dst_git_url))
-
-    dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
-
-    # Copy git commits from src repo to dst repo.
-    if src_git_repo not in local_repo_dir:
-        local_repo_dir[src_git_repo] = tempfile.mkdtemp()
-        arvados.util.run_command(
-            ["git"] + src_git_config + ["clone", "--bare", src_git_url,
-             local_repo_dir[src_git_repo]],
-            cwd=os.path.dirname(local_repo_dir[src_git_repo]),
-            env={"HOME": os.environ["HOME"],
-                 "ARVADOS_API_TOKEN": src.api_token,
-                 "GIT_ASKPASS": "/bin/false"})
-        arvados.util.run_command(
-            ["git", "remote", "add", "dst", dst_git_url],
-            cwd=local_repo_dir[src_git_repo])
-    arvados.util.run_command(
-        ["git", "branch", dst_branch, script_version],
-        cwd=local_repo_dir[src_git_repo])
-    arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
-                             cwd=local_repo_dir[src_git_repo],
-                             env={"HOME": os.environ["HOME"],
-                                  "ARVADOS_API_TOKEN": dst.api_token,
-                                  "GIT_ASKPASS": "/bin/false"})
-
-def copy_docker_images(pipeline, src, dst, args):
-    """Copy any docker images named in the pipeline components'
-    runtime_constraints field from src to dst."""
-
-    logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
-    for c_name, c_info in pipeline['components'].items():
-        if ('runtime_constraints' in c_info and
-            'docker_image' in c_info['runtime_constraints']):
-            copy_docker_image(
-                c_info['runtime_constraints']['docker_image'],
-                c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
-                src, dst, args)
-
-
 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
     """Copy the docker image identified by docker_image and
     docker_image_tag from src to dst. Create appropriate
@@ -899,6 +805,56 @@ def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
     else:
         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
 
+def copy_project(obj_uuid, src, dst, owner_uuid, args):
+
+    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
+
+    # Create/update the destination project
+    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
+                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
+    if len(existing["items"]) == 0:
+        project_record = dst.groups().create(body={"group": {"group_class": "project",
+                                                             "owner_uuid": owner_uuid,
+                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
+    else:
+        project_record = existing["items"][0]
+
+    dst.groups().update(uuid=project_record["uuid"],
+                        body={"group": {
+                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
+
+    args.project_uuid = project_record["uuid"]
+
+    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
+
+
+    partial_error = ""
+
+    # Copy collections
+    try:
+        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+                         src, dst, args)
+    except Exception as e:
+        partial_error += "\n" + str(e)
+
+    # Copy workflows
+    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+        try:
+            copy_workflow(w["uuid"], src, dst, args)
+        except Exception as e:
+            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
+
+    if args.recursive:
+        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+            try:
+                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+            except Exception as e:
+                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
+
+    project_record["partial_error"] = partial_error
+
+    return project_record
+
 # git_rev_parse(rev, repo)
 #
 #    Returns the 40-character commit hash corresponding to 'rev' in
@@ -906,9 +862,14 @@ def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
 #    repository)
 #
 def git_rev_parse(rev, repo):
-    gitout, giterr = arvados.util.run_command(
-        ['git', 'rev-parse', rev], cwd=repo)
-    return gitout.strip()
+    proc = subprocess.run(
+        ['git', 'rev-parse', rev],
+        check=True,
+        cwd=repo,
+        stdout=subprocess.PIPE,
+        text=True,
+    )
+    return proc.stdout.read().strip()
 
 # uuid_type(api, object_uuid)
 #
@@ -916,13 +877,17 @@ def git_rev_parse(rev, repo):
 #    the second field of the uuid.  This function consults the api's
 #    schema to identify the object class.
 #
-#    It returns a string such as 'Collection', 'PipelineInstance', etc.
+#    It returns a string such as 'Collection', 'Workflow', etc.
 #
 #    Special case: if handed a Keep locator hash, return 'Collection'.
 #
 def uuid_type(api, object_uuid):
-    if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
+    if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
+
+    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+        return 'httpURL'
+
     p = object_uuid.split('-')
     if len(p) == 3:
         type_prefix = p[1]
@@ -932,6 +897,27 @@ def uuid_type(api, object_uuid):
                 return k
     return None
 
+
+def copy_from_http(url, src, dst, args):
+
+    project_uuid = args.project_uuid
+    varying_url_params = args.varying_url_params
+    prefer_cached_downloads = args.prefer_cached_downloads
+
+    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+                                                   varying_url_params=varying_url_params,
+                                                   prefer_cached_downloads=prefer_cached_downloads)
+    if cached[2] is not None:
+        return copy_collection(cached[2], src, dst, args)
+
+    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+                                               varying_url_params=varying_url_params,
+                                               prefer_cached_downloads=prefer_cached_downloads)
+
+    if cached is not None:
+        return {"uuid": cached[2]}
+
+
 def abort(msg, code=1):
     logger.info("arv-copy: %s", msg)
     exit(code)