12684: Support num_retries in PySDK client constructors
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
index dc6cdac885c1b79b5bbd8acb384e36d57087bf0c..63c0cbea28b41ecec9ad81b0076201beae489563 100755 (executable)
@@ -2,7 +2,7 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-# arv-copy [--recursive] [--no-recursive] object-uuid src dst
+# arv-copy [--recursive] [--no-recursive] object-uuid
 #
 # Copies an object from Arvados instance src to instance dst.
 #
@@ -34,6 +34,7 @@ import sys
 import logging
 import tempfile
 import urllib.parse
+import io
 
 import arvados
 import arvados.config
@@ -87,20 +88,23 @@ def main():
         '-f', '--force', dest='force', action='store_true',
         help='Perform copy even if the object appears to exist at the remote destination.')
     copy_opts.add_argument(
-        '--src', dest='source_arvados', required=True,
-        help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+        '--src', dest='source_arvados',
+        help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
     copy_opts.add_argument(
-        '--dst', dest='destination_arvados', required=True,
-        help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+        '--dst', dest='destination_arvados',
+        help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
     copy_opts.add_argument(
         '--recursive', dest='recursive', action='store_true',
-        help='Recursively copy any dependencies for this object. (default)')
+        help='Recursively copy any dependencies for this object, and subprojects. (default)')
     copy_opts.add_argument(
         '--no-recursive', dest='recursive', action='store_false',
-        help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
+        help='Do not copy any dependencies or subprojects.')
     copy_opts.add_argument(
         '--project-uuid', dest='project_uuid',
         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
+    copy_opts.add_argument(
+        '--storage-classes', dest='storage_classes',
+        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
 
     copy_opts.add_argument(
         'object_uuid',
@@ -109,18 +113,24 @@ def main():
     copy_opts.set_defaults(recursive=True)
 
     parser = argparse.ArgumentParser(
-        description='Copy a workflow or collection from one Arvados instance to another.',
+        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
         parents=[copy_opts, arv_cmd.retry_opt])
     args = parser.parse_args()
 
+    if args.storage_classes:
+        args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
+
     if args.verbose:
         logger.setLevel(logging.DEBUG)
     else:
         logger.setLevel(logging.INFO)
 
+    if not args.source_arvados:
+        args.source_arvados = args.object_uuid[:5]
+
     # Create API clients for the source and destination instances
-    src_arv = api_for_instance(args.source_arvados)
-    dst_arv = api_for_instance(args.destination_arvados)
+    src_arv = api_for_instance(args.source_arvados, args.retries)
+    dst_arv = api_for_instance(args.destination_arvados, args.retries)
 
     if not args.project_uuid:
         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
@@ -135,6 +145,9 @@ def main():
     elif t == 'Workflow':
         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+    elif t == 'Group':
+        set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+        result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
     else:
         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 
@@ -148,7 +161,12 @@ def main():
         logger.error("API server returned an error result: {}".format(result))
         exit(1)
 
-    logger.info("")
+    print(result['uuid'])
+
+    if result.get('partial_error'):
+        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
+        exit(1)
+
     logger.info("Success: created copy with uuid {}".format(result['uuid']))
     exit(0)
 
@@ -169,7 +187,11 @@ def set_src_owner_uuid(resource, uuid, args):
 #     Otherwise, it is presumed to be the name of a file in
 #     $HOME/.config/arvados/instance_name.conf
 #
-def api_for_instance(instance_name):
+def api_for_instance(instance_name, num_retries):
+    if not instance_name:
+        # Use environment
+        return arvados.api('v1', model=OrderedJsonModel())
+
     if '/' in instance_name:
         config_file = instance_name
     else:
@@ -192,7 +214,9 @@ def api_for_instance(instance_name):
                              host=cfg['ARVADOS_API_HOST'],
                              token=cfg['ARVADOS_API_TOKEN'],
                              insecure=api_is_insecure,
-                             model=OrderedJsonModel())
+                             model=OrderedJsonModel(),
+                             num_retries=num_retries,
+                             )
     else:
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
@@ -275,8 +299,11 @@ def copy_workflow(wf_uuid, src, dst, args):
     # fetch the workflow from the source instance
     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
 
+    if not wf["definition"]:
+        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
+
     # copy collections and docker images
-    if args.recursive:
+    if args.recursive and wf["definition"]:
         wf_def = yaml.safe_load(wf["definition"])
         if wf_def is not None:
             locations = []
@@ -296,7 +323,14 @@ def copy_workflow(wf_uuid, src, dst, args):
     # copy the workflow itself
     del wf['uuid']
     wf['owner_uuid'] = args.project_uuid
-    return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+
+    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
+                                             ["name", "=", wf["name"]]]).execute()
+    if len(existing["items"]) == 0:
+        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+    else:
+        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
+
 
 def workflow_collections(obj, locations, docker_images):
     if isinstance(obj, dict):
@@ -392,6 +426,9 @@ def create_collection_from(c, src, dst, args):
     if not body["name"]:
         body['name'] = "copied from " + collection_uuid
 
+    if args.storage_classes:
+        body['storage_classes_desired'] = args.storage_classes
+
     body['owner_uuid'] = args.project_uuid
 
     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
@@ -516,7 +553,7 @@ def copy_collection(obj_uuid, src, dst, args):
     # a new manifest as we go.
     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
-    dst_manifest = ""
+    dst_manifest = io.StringIO()
     dst_locators = {}
     bytes_written = 0
     bytes_expected = total_collection_size(manifest)
@@ -527,14 +564,15 @@ def copy_collection(obj_uuid, src, dst, args):
 
     for line in manifest.splitlines():
         words = line.split()
-        dst_manifest += words[0]
+        dst_manifest.write(words[0])
         for word in words[1:]:
             try:
                 loc = arvados.KeepLocator(word)
             except ValueError:
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
-                dst_manifest += ' ' + word
+                dst_manifest.write(' ')
+                dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
             # copy this block if we haven't seen it before
@@ -544,20 +582,21 @@ def copy_collection(obj_uuid, src, dst, args):
                 if progress_writer:
                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
                 data = src_keep.get(word)
-                dst_locator = dst_keep.put(data)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
                 dst_locators[blockhash] = dst_locator
                 bytes_written += loc.size
-            dst_manifest += ' ' + dst_locators[blockhash]
-        dst_manifest += "\n"
+            dst_manifest.write(' ')
+            dst_manifest.write(dst_locators[blockhash])
+        dst_manifest.write("\n")
 
     if progress_writer:
         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
         progress_writer.finish()
 
     # Copy the manifest and save the collection.
-    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
+    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
 
-    c['manifest_text'] = dst_manifest
+    c['manifest_text'] = dst_manifest.getvalue()
     return create_collection_from(c, src, dst, args)
 
 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
@@ -632,6 +671,56 @@ def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
     else:
         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
 
+def copy_project(obj_uuid, src, dst, owner_uuid, args):
+
+    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
+
+    # Create/update the destination project
+    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
+                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
+    if len(existing["items"]) == 0:
+        project_record = dst.groups().create(body={"group": {"group_class": "project",
+                                                             "owner_uuid": owner_uuid,
+                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
+    else:
+        project_record = existing["items"][0]
+
+    dst.groups().update(uuid=project_record["uuid"],
+                        body={"group": {
+                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
+
+    args.project_uuid = project_record["uuid"]
+
+    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
+
+
+    partial_error = ""
+
+    # Copy collections
+    try:
+        copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+                         src, dst, args)
+    except Exception as e:
+        partial_error += "\n" + str(e)
+
+    # Copy workflows
+    for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+        try:
+            copy_workflow(w["uuid"], src, dst, args)
+        except Exception as e:
+            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
+
+    if args.recursive:
+        for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+            try:
+                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+            except Exception as e:
+                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
+
+    project_record["partial_error"] = partial_error
+
+    return project_record
+
 # git_rev_parse(rev, repo)
 #
 #    Returns the 40-character commit hash corresponding to 'rev' in
@@ -654,7 +743,7 @@ def git_rev_parse(rev, repo):
 #    Special case: if handed a Keep locator hash, return 'Collection'.
 #
 def uuid_type(api, object_uuid):
-    if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
+    if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
     p = object_uuid.split('-')
     if len(p) == 3: