# destination collection UUIDs.
collections_copied = {}
+# Set of (repository, script_version) two-tuples of commits copied in git.
+scripts_copied = set()
+
+# The owner_uuid of the object being copied
+src_owner_uuid = None
+
def main():
copy_opts = argparse.ArgumentParser(add_help=False)
# Identify the kind of object we have been given, and begin copying.
t = uuid_type(src_arv, args.object_uuid)
if t == 'Collection':
+ set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
result = copy_collection(args.object_uuid,
src_arv, dst_arv,
args)
elif t == 'PipelineInstance':
+ set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
result = copy_pipeline_instance(args.object_uuid,
src_arv, dst_arv,
args)
elif t == 'PipelineTemplate':
+ set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
result = copy_pipeline_template(args.object_uuid,
src_arv, dst_arv, args)
else:
logger.info("Success: created copy with uuid {}".format(result['uuid']))
exit(0)
+def set_src_owner_uuid(resource, uuid, args):
+ global src_owner_uuid
+ c = resource.get(uuid=uuid).execute(num_retries=args.retries)
+ src_owner_uuid = c.get("owner_uuid")
+
# api_for_instance(instance_name)
#
# Creates an API client for the Arvados instance identified by
#
def copy_pipeline_instance(pi_uuid, src, dst, args):
# Fetch the pipeline instance record.
- pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
+ pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
if args.recursive:
if not args.dst_git_repo:
# Copy input collections, docker images and git repos.
pi = copy_collections(pi, src, dst, args)
- copy_git_repos(pi, src, dst, args.dst_git_repo)
+ copy_git_repos(pi, src, dst, args.dst_git_repo, args)
copy_docker_images(pi, src, dst, args)
# Update the fields of the pipeline instance with the copied
del pi['uuid']
- new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute()
+ new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
return new_pi
# copy_pipeline_template(pt_uuid, src, dst, args)
#
def copy_pipeline_template(pt_uuid, src, dst, args):
# fetch the pipeline template from the source instance
- pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
+ pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
if args.recursive:
if not args.dst_git_repo:
abort('--dst-git-repo is required when copying a pipeline recursively.')
# Copy input collections, docker images and git repos.
pt = copy_collections(pt, src, dst, args)
- copy_git_repos(pt, src, dst, args.dst_git_repo)
+ copy_git_repos(pt, src, dst, args.dst_git_repo, args)
copy_docker_images(pt, src, dst, args)
pt['description'] = "Pipeline template copied from {}\n\n{}".format(
pt['owner_uuid'] = args.project_uuid
- return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute()
+ return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
# copy_collections(obj, src, dst, args)
#
return [copy_collections(v, src, dst, args) for v in obj]
return obj
-# copy_git_repos(p, src, dst, dst_repo)
+def migrate_jobspec(jobspec, src, dst, dst_repo, args):
+ """Copy a job's script to the destination repository, and update its record.
+
+ Given a jobspec dictionary, this function finds the referenced script from
+ src and copies it to dst and dst_repo. It also updates jobspec in place to
+ refer to names on the destination.
+ """
+ repo = jobspec.get('repository')
+ if repo is None:
+ return
+ # script_version is the "script_version" parameter from the source
+ # component or job. If no script_version was supplied in the
+ # component or job, it is a mistake in the pipeline, but for the
+ # purposes of copying the repository, default to "master".
+ script_version = jobspec.get('script_version') or 'master'
+ script_key = (repo, script_version)
+ if script_key not in scripts_copied:
+ copy_git_repo(repo, src, dst, dst_repo, script_version, args)
+ scripts_copied.add(script_key)
+ jobspec['repository'] = dst_repo
+ repo_dir = local_repo_dir[repo]
+ for version_key in ['script_version', 'supplied_script_version']:
+ if version_key in jobspec:
+ jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
+
+# copy_git_repos(p, src, dst, dst_repo, args)
#
# Copies all git repositories referenced by pipeline instance or
# template 'p' from src to dst.
# The pipeline object is updated in place with the new repository
# names. The return value is undefined.
#
-def copy_git_repos(p, src, dst, dst_repo):
- copied = set()
- for c in p['components']:
- component = p['components'][c]
- if 'repository' in component:
- repo = component['repository']
- script_version = component.get('script_version', None)
- if repo not in copied:
- copy_git_repo(repo, src, dst, dst_repo, script_version)
- copied.add(repo)
- component['repository'] = dst_repo
- if script_version:
- repo_dir = local_repo_dir[repo]
- component['script_version'] = git_rev_parse(script_version, repo_dir)
+def copy_git_repos(p, src, dst, dst_repo, args):
+ for component in p['components'].itervalues():
+ migrate_jobspec(component, src, dst, dst_repo, args)
if 'job' in component:
- j = component['job']
- if 'repository' in j:
- repo = j['repository']
- script_version = j.get('script_version', None)
- if repo not in copied:
- copy_git_repo(repo, src, dst, dst_repo, script_version)
- copied.add(repo)
- j['repository'] = dst_repo
- repo_dir = local_repo_dir[repo]
- if script_version:
- j['script_version'] = git_rev_parse(script_version, repo_dir)
- if 'supplied_script_version' in j:
- j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
+ migrate_jobspec(component['job'], src, dst, dst_repo, args)
def total_collection_size(manifest_text):
"""Return the total number of bytes in this collection (excluding
available."""
collection_uuid = c['uuid']
-
del c['uuid']
+ if not c["name"]:
+ c['name'] = "copied from " + collection_uuid
+
if 'properties' in c:
del c['properties']
for link_class in ("docker_image_repo+tag", "docker_image_hash"):
docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
- for d in docker_links:
- body={
- 'head_uuid': dst_collection['uuid'],
- 'link_class': link_class,
- 'name': d['name'],
- }
+ for src_link in docker_links:
+ body = {key: src_link[key]
+ for key in ['link_class', 'name', 'properties']}
+ body['head_uuid'] = dst_collection['uuid']
body['owner_uuid'] = args.project_uuid
lk = dst.links().create(body=body).execute(num_retries=args.retries)
# hash will not change.
#
def copy_collection(obj_uuid, src, dst, args):
- c = src.collections().get(uuid=obj_uuid).execute()
+ if arvados.util.keep_locator_pattern.match(obj_uuid):
+ # If the obj_uuid is a portable data hash, it might not be uniquely
+ # identified with a particular collection. As a result, it is
+ # ambigious as to what name to use for the copy. Apply some heuristics
+ # to pick which collection to get the name from.
+ srccol = src.collections().list(
+ filters=[['portable_data_hash', '=', obj_uuid]],
+ order="created_at asc"
+ ).execute(num_retries=args.retries)
+
+ items = srccol.get("items")
+
+ if not items:
+ logger.warning("Could not find collection with portable data hash %s", obj_uuid)
+ return
+
+ c = None
+
+ if len(items) == 1:
+ # There's only one collection with the PDH, so use that.
+ c = items[0]
+ if not c:
+ # See if there is a collection that's in the same project
+ # as the root item (usually a pipeline) being copied.
+ for i in items:
+ if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
+ c = i
+ break
+ if not c:
+ # Didn't find any collections located in the same project, so
+ # pick the oldest collection that has a name assigned to it.
+ for i in items:
+ if i.get("name"):
+ c = i
+ break
+ if not c:
+ # None of the collections have names (?!), so just pick the
+ # first one.
+ c = items[0]
+
+ # list() doesn't return manifest text (and we don't want it to,
+ # because we don't need the same maninfest text sent to us 50
+ # times) so go and retrieve the collection object directly
+ # which will include the manifest text.
+ c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
+ else:
+ # Assume this is an actual collection uuid, so fetch it directly.
+ c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
# If a collection with this hash already exists at the
# destination, and 'force' is not true, just return that
colhash = c['uuid']
dstcol = dst.collections().list(
filters=[['portable_data_hash', '=', colhash]]
- ).execute()
+ ).execute(num_retries=args.retries)
if dstcol['items_available'] > 0:
for d in dstcol['items']:
if ((args.project_uuid == d['owner_uuid']) and
- (c['name'] == d['name']) and
+ (c.get('name') == d['name']) and
(c['portable_data_hash'] == d['portable_data_hash'])):
return d
-
+ c['manifest_text'] = dst.collections().get(
+ uuid=dstcol['items'][0]['uuid']
+ ).execute(num_retries=args.retries)['manifest_text']
return create_collection_from(c, src, dst, args)
# Fetch the collection's manifest.
c['manifest_text'] = dst_manifest
return create_collection_from(c, src, dst, args)
-# copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version)
+# copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
#
# Copies commits from git repository 'src_git_repo' on Arvados
# instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
# All commits will be copied to a destination branch named for the
# source repository URL.
#
-# Because users cannot create their own repositories, the
-# destination repository must already exist.
+# The destination repository must already exist.
#
# The user running this command must be authenticated
# to both repositories.
#
-def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version):
+def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
# Identify the fetch and push URLs for the git repositories.
r = src.repositories().list(
- filters=[['name', '=', src_git_repo]]).execute()
+ filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
if r['items_available'] != 1:
raise Exception('cannot identify source repo {}; {} repos found'
.format(src_git_repo, r['items_available']))
logger.debug('src_git_url: {}'.format(src_git_url))
r = dst.repositories().list(
- filters=[['name', '=', dst_git_repo]]).execute()
+ filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
if r['items_available'] != 1:
raise Exception('cannot identify destination repo {}; {} repos found'
.format(dst_git_repo, r['items_available']))
dst_git_push_url = r['items'][0]['push_url']
logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
- # script_version is the "script_version" parameter from the source
- # component or job. It is used here to tie the destination branch
- # to the commit that was used on the source. If no script_version
- # was supplied in the component or job, it is a mistake in the pipeline,
- # but for the purposes of copying the repository, default to "master".
- #
- if not script_version:
- script_version = "master"
-
dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
- # Copy git commits from src repo to dst repo (but only if
- # we have not already copied this repo in this session).
- #
- if src_git_repo in local_repo_dir:
- logger.debug('already copied src repo %s, skipping', src_git_repo)
- else:
- tmprepo = tempfile.mkdtemp()
- local_repo_dir[src_git_repo] = tmprepo
+ # Copy git commits from src repo to dst repo.
+ if src_git_repo not in local_repo_dir:
+ local_repo_dir[src_git_repo] = tempfile.mkdtemp()
arvados.util.run_command(
- ["git", "clone", "--bare", src_git_url, tmprepo],
- cwd=os.path.dirname(tmprepo))
+ ["git", "clone", "--bare", src_git_url,
+ local_repo_dir[src_git_repo]],
+ cwd=os.path.dirname(local_repo_dir[src_git_repo]))
arvados.util.run_command(
- ["git", "branch", dst_branch, script_version],
- cwd=tmprepo)
- arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
- arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
-
+ ["git", "remote", "add", "dst", dst_git_push_url],
+ cwd=local_repo_dir[src_git_repo])
+ arvados.util.run_command(
+ ["git", "branch", dst_branch, script_version],
+ cwd=local_repo_dir[src_git_repo])
+ arvados.util.run_command(["git", "push", "dst", dst_branch],
+ cwd=local_repo_dir[src_git_repo])
def copy_docker_images(pipeline, src, dst, args):
"""Copy any docker images named in the pipeline components'
# Find the link identifying this docker image.
docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
src, args.retries, docker_image, docker_image_tag)
- image_uuid, image_info = docker_image_list[0]
- logger.debug('copying collection {} {}'.format(image_uuid, image_info))
-
- # Copy the collection it refers to.
- dst_image_col = copy_collection(image_uuid, src, dst, args)
-
+ if docker_image_list:
+ image_uuid, image_info = docker_image_list[0]
+ logger.debug('copying collection {} {}'.format(image_uuid, image_info))
+
+ # Copy the collection it refers to.
+ dst_image_col = copy_collection(image_uuid, src, dst, args)
+ elif arvados.util.keep_locator_pattern.match(docker_image):
+ dst_image_col = copy_collection(docker_image, src, dst, args)
+ else:
+ logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
# git_rev_parse(rev, repo)
#