Merge branch 'master' into 6827-no-passwords-in-logs
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
index 9e6d4312f3ef233ac1daa223b598099be1e7d29b..f728b2c59142359bf66855ec530dd82bfb483507 100755 (executable)
@@ -32,6 +32,8 @@ import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
 
+from arvados.api import OrderedJsonModel
+
 logger = logging.getLogger('arvados.arv-copy')
 
 # local_repo_dir records which git repositories from the Arvados source
@@ -46,6 +48,9 @@ local_repo_dir = {}
 # destination collection UUIDs.
 collections_copied = {}
 
+# Set of (repository, script_version) two-tuples of commits copied in git.
+scripts_copied = set()
+
 # The owner_uuid of the object being copied
 src_owner_uuid = None
 
@@ -139,6 +144,7 @@ def main():
     exit(0)
 
 def set_src_owner_uuid(resource, uuid, args):
+    global src_owner_uuid
     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
     src_owner_uuid = c.get("owner_uuid")
 
@@ -176,11 +182,19 @@ def api_for_instance(instance_name):
         client = arvados.api('v1',
                              host=cfg['ARVADOS_API_HOST'],
                              token=cfg['ARVADOS_API_TOKEN'],
-                             insecure=api_is_insecure)
+                             insecure=api_is_insecure,
+                             model=OrderedJsonModel())
     else:
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
 
+# Check if git is available
+def check_git_availability():
+    try:
+        arvados.util.run_command(['git', '--help'])
+    except Exception:
+        abort('git command is not available. Please ensure git is installed.')
+
 # copy_pipeline_instance(pi_uuid, src, dst, args)
 #
 #    Copies a pipeline instance identified by pi_uuid from src to dst.
@@ -205,6 +219,8 @@ def copy_pipeline_instance(pi_uuid, src, dst, args):
     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
 
     if args.recursive:
+        check_git_availability()
+
         if not args.dst_git_repo:
             abort('--dst-git-repo is required when copying a pipeline recursively.')
         # Copy the pipeline template and save the copied template.
@@ -258,6 +274,8 @@ def copy_pipeline_template(pt_uuid, src, dst, args):
     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
 
     if args.recursive:
+        check_git_availability()
+
         if not args.dst_git_repo:
             abort('--dst-git-repo is required when copying a pipeline recursively.')
         # Copy input collections, docker images and git repos.
@@ -311,12 +329,37 @@ def copy_collections(obj, src, dst, args):
         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
         return obj
-    elif type(obj) == dict:
+    elif isinstance(obj, dict):
         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
-    elif type(obj) == list:
+    elif isinstance(obj, list):
         return [copy_collections(v, src, dst, args) for v in obj]
     return obj
 
+def migrate_jobspec(jobspec, src, dst, dst_repo, args):
+    """Copy a job's script to the destination repository, and update its record.
+
+    Given a jobspec dictionary, this function finds the referenced script from
+    src and copies it to dst and dst_repo.  It also updates jobspec in place to
+    refer to names on the destination.
+    """
+    repo = jobspec.get('repository')
+    if repo is None:
+        return
+    # script_version is the "script_version" parameter from the source
+    # component or job.  If no script_version was supplied in the
+    # component or job, it is a mistake in the pipeline, but for the
+    # purposes of copying the repository, default to "master".
+    script_version = jobspec.get('script_version') or 'master'
+    script_key = (repo, script_version)
+    if script_key not in scripts_copied:
+        copy_git_repo(repo, src, dst, dst_repo, script_version, args)
+        scripts_copied.add(script_key)
+    jobspec['repository'] = dst_repo
+    repo_dir = local_repo_dir[repo]
+    for version_key in ['script_version', 'supplied_script_version']:
+        if version_key in jobspec:
+            jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
+
 # copy_git_repos(p, src, dst, dst_repo, args)
 #
 #    Copies all git repositories referenced by pipeline instance or
@@ -335,33 +378,10 @@ def copy_collections(obj, src, dst, args):
 #    names.  The return value is undefined.
 #
 def copy_git_repos(p, src, dst, dst_repo, args):
-    copied = set()
-    for c in p['components']:
-        component = p['components'][c]
-        if 'repository' in component:
-            repo = component['repository']
-            script_version = component.get('script_version', None)
-            if repo not in copied:
-                copy_git_repo(repo, src, dst, dst_repo, script_version, args)
-                copied.add(repo)
-            component['repository'] = dst_repo
-            if script_version:
-                repo_dir = local_repo_dir[repo]
-                component['script_version'] = git_rev_parse(script_version, repo_dir)
+    for component in p['components'].itervalues():
+        migrate_jobspec(component, src, dst, dst_repo, args)
         if 'job' in component:
-            j = component['job']
-            if 'repository' in j:
-                repo = j['repository']
-                script_version = j.get('script_version', None)
-                if repo not in copied:
-                    copy_git_repo(repo, src, dst, dst_repo, script_version, args)
-                    copied.add(repo)
-                j['repository'] = dst_repo
-                repo_dir = local_repo_dir[repo]
-                if script_version:
-                    j['script_version'] = git_rev_parse(script_version, repo_dir)
-                if 'supplied_script_version' in j:
-                    j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
+            migrate_jobspec(component['job'], src, dst, dst_repo, args)
 
 def total_collection_size(manifest_text):
     """Return the total number of bytes in this collection (excluding
@@ -404,12 +424,10 @@ def create_collection_from(c, src, dst, args):
     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
 
-        for d in docker_links:
-            body={
-                'head_uuid': dst_collection['uuid'],
-                'link_class': link_class,
-                'name': d['name'],
-            }
+        for src_link in docker_links:
+            body = {key: src_link[key]
+                    for key in ['link_class', 'name', 'properties']}
+            body['head_uuid'] = dst_collection['uuid']
             body['owner_uuid'] = args.project_uuid
 
             lk = dst.links().create(body=body).execute(num_retries=args.retries)
@@ -506,7 +524,9 @@ def copy_collection(obj_uuid, src, dst, args):
                     (c.get('name') == d['name']) and
                     (c['portable_data_hash'] == d['portable_data_hash'])):
                     return d
-
+            c['manifest_text'] = dst.collections().get(
+                uuid=dstcol['items'][0]['uuid']
+            ).execute(num_retries=args.retries)['manifest_text']
             return create_collection_from(c, src, dst, args)
 
     # Fetch the collection's manifest.
@@ -527,31 +547,30 @@ def copy_collection(obj_uuid, src, dst, args):
     else:
         progress_writer = None
 
-    for line in manifest.splitlines(True):
+    for line in manifest.splitlines():
         words = line.split()
-        dst_manifest_line = words[0]
+        dst_manifest += words[0]
         for word in words[1:]:
             try:
                 loc = arvados.KeepLocator(word)
-                blockhash = loc.md5sum
-                # copy this block if we haven't seen it before
-                # (otherwise, just reuse the existing dst_locator)
-                if blockhash not in dst_locators:
-                    logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                    if progress_writer:
-                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                    data = src_keep.get(word)
-                    dst_locator = dst_keep.put(data)
-                    dst_locators[blockhash] = dst_locator
-                    bytes_written += loc.size
-                dst_manifest_line += ' ' + dst_locators[blockhash]
             except ValueError:
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
-                dst_manifest_line += ' ' + word
-        dst_manifest += dst_manifest_line
-        if line.endswith("\n"):
-            dst_manifest += "\n"
+                dst_manifest += ' ' + word
+                continue
+            blockhash = loc.md5sum
+            # copy this block if we haven't seen it before
+            # (otherwise, just reuse the existing dst_locator)
+            if blockhash not in dst_locators:
+                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
+                if progress_writer:
+                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+                data = src_keep.get(word)
+                dst_locator = dst_keep.put(data)
+                dst_locators[blockhash] = dst_locator
+                bytes_written += loc.size
+            dst_manifest += ' ' + dst_locators[blockhash]
+        dst_manifest += "\n"
 
     if progress_writer:
         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
@@ -560,7 +579,6 @@ def copy_collection(obj_uuid, src, dst, args):
     # Copy the manifest and save the collection.
     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
 
-    dst_keep.put(dst_manifest.encode('utf-8'))
     c['manifest_text'] = dst_manifest
     return create_collection_from(c, src, dst, args)
 
@@ -574,8 +592,7 @@ def copy_collection(obj_uuid, src, dst, args):
 #    All commits will be copied to a destination branch named for the
 #    source repository URL.
 #
-#    Because users cannot create their own repositories, the
-#    destination repository must already exist.
+#    The destination repository must already exist.
 #
 #    The user running this command must be authenticated
 #    to both repositories.
@@ -598,34 +615,23 @@ def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
     dst_git_push_url  = r['items'][0]['push_url']
     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
 
-    # script_version is the "script_version" parameter from the source
-    # component or job.  It is used here to tie the destination branch
-    # to the commit that was used on the source.  If no script_version
-    # was supplied in the component or job, it is a mistake in the pipeline,
-    # but for the purposes of copying the repository, default to "master".
-    #
-    if not script_version:
-        script_version = "master"
-
     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
 
-    # Copy git commits from src repo to dst repo (but only if
-    # we have not already copied this repo in this session).
-    #
-    if src_git_repo in local_repo_dir:
-        logger.debug('already copied src repo %s, skipping', src_git_repo)
-    else:
-        tmprepo = tempfile.mkdtemp()
-        local_repo_dir[src_git_repo] = tmprepo
+    # Copy git commits from src repo to dst repo.
+    if src_git_repo not in local_repo_dir:
+        local_repo_dir[src_git_repo] = tempfile.mkdtemp()
         arvados.util.run_command(
-            ["git", "clone", "--bare", src_git_url, tmprepo],
-            cwd=os.path.dirname(tmprepo))
+            ["git", "clone", "--bare", src_git_url,
+             local_repo_dir[src_git_repo]],
+            cwd=os.path.dirname(local_repo_dir[src_git_repo]))
         arvados.util.run_command(
-            ["git", "branch", dst_branch, script_version],
-            cwd=tmprepo)
-        arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
-        arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
-
+            ["git", "remote", "add", "dst", dst_git_push_url],
+            cwd=local_repo_dir[src_git_repo])
+    arvados.util.run_command(
+        ["git", "branch", dst_branch, script_version],
+        cwd=local_repo_dir[src_git_repo])
+    arvados.util.run_command(["git", "push", "dst", dst_branch],
+                             cwd=local_repo_dir[src_git_repo])
 
 def copy_docker_images(pipeline, src, dst, args):
     """Copy any docker images named in the pipeline components'