import arvados.commands._util as arv_cmd
import arvados.commands.keepdocker
+from arvados.api import OrderedJsonModel
+
logger = logging.getLogger('arvados.arv-copy')
# local_repo_dir records which git repositories from the Arvados source
# destination collection UUIDs.
collections_copied = {}
+# Set of (repository, script_version) two-tuples of commits copied in git.
+scripts_copied = set()
+
# The owner_uuid of the object being copied
src_owner_uuid = None
exit(0)
def set_src_owner_uuid(resource, uuid, args):
+ global src_owner_uuid
c = resource.get(uuid=uuid).execute(num_retries=args.retries)
src_owner_uuid = c.get("owner_uuid")
client = arvados.api('v1',
host=cfg['ARVADOS_API_HOST'],
token=cfg['ARVADOS_API_TOKEN'],
- insecure=api_is_insecure)
+ insecure=api_is_insecure,
+ model=OrderedJsonModel())
else:
abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
return client
+# Check if git is available
+def check_git_availability():
+ try:
+ arvados.util.run_command(['git', '--help'])
+ except Exception:
+ abort('git command is not available. Please ensure git is installed.')
+
# copy_pipeline_instance(pi_uuid, src, dst, args)
#
# Copies a pipeline instance identified by pi_uuid from src to dst.
pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
if args.recursive:
+ check_git_availability()
+
if not args.dst_git_repo:
abort('--dst-git-repo is required when copying a pipeline recursively.')
# Copy the pipeline template and save the copied template.
pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
if args.recursive:
+ check_git_availability()
+
if not args.dst_git_repo:
abort('--dst-git-repo is required when copying a pipeline recursively.')
# Copy input collections, docker images and git repos.
obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
return obj
- elif type(obj) == dict:
+ elif isinstance(obj, dict):
return {v: copy_collections(obj[v], src, dst, args) for v in obj}
- elif type(obj) == list:
+ elif isinstance(obj, list):
return [copy_collections(v, src, dst, args) for v in obj]
return obj
+def migrate_jobspec(jobspec, src, dst, dst_repo, args):
+ """Copy a job's script to the destination repository, and update its record.
+
+ Given a jobspec dictionary, this function finds the referenced script from
+ src and copies it to dst and dst_repo. It also updates jobspec in place to
+ refer to names on the destination.
+ """
+ repo = jobspec.get('repository')
+ if repo is None:
+ return
+ # script_version is the "script_version" parameter from the source
+ # component or job. If no script_version was supplied in the
+ # component or job, it is a mistake in the pipeline, but for the
+ # purposes of copying the repository, default to "master".
+ script_version = jobspec.get('script_version') or 'master'
+ script_key = (repo, script_version)
+ if script_key not in scripts_copied:
+ copy_git_repo(repo, src, dst, dst_repo, script_version, args)
+ scripts_copied.add(script_key)
+ jobspec['repository'] = dst_repo
+ repo_dir = local_repo_dir[repo]
+ for version_key in ['script_version', 'supplied_script_version']:
+ if version_key in jobspec:
+ jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
+
# copy_git_repos(p, src, dst, dst_repo, args)
#
# Copies all git repositories referenced by pipeline instance or
# names. The return value is undefined.
#
def copy_git_repos(p, src, dst, dst_repo, args):
- copied = set()
- for c in p['components']:
- component = p['components'][c]
- if 'repository' in component:
- repo = component['repository']
- script_version = component.get('script_version', None)
- if repo not in copied:
- copy_git_repo(repo, src, dst, dst_repo, script_version, args)
- copied.add(repo)
- component['repository'] = dst_repo
- if script_version:
- repo_dir = local_repo_dir[repo]
- component['script_version'] = git_rev_parse(script_version, repo_dir)
+ for component in p['components'].itervalues():
+ migrate_jobspec(component, src, dst, dst_repo, args)
if 'job' in component:
- j = component['job']
- if 'repository' in j:
- repo = j['repository']
- script_version = j.get('script_version', None)
- if repo not in copied:
- copy_git_repo(repo, src, dst, dst_repo, script_version, args)
- copied.add(repo)
- j['repository'] = dst_repo
- repo_dir = local_repo_dir[repo]
- if script_version:
- j['script_version'] = git_rev_parse(script_version, repo_dir)
- if 'supplied_script_version' in j:
- j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
+ migrate_jobspec(component['job'], src, dst, dst_repo, args)
def total_collection_size(manifest_text):
"""Return the total number of bytes in this collection (excluding
for link_class in ("docker_image_repo+tag", "docker_image_hash"):
docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
- for d in docker_links:
- body={
- 'head_uuid': dst_collection['uuid'],
- 'link_class': link_class,
- 'name': d['name'],
- }
+ for src_link in docker_links:
+ body = {key: src_link[key]
+ for key in ['link_class', 'name', 'properties']}
+ body['head_uuid'] = dst_collection['uuid']
body['owner_uuid'] = args.project_uuid
lk = dst.links().create(body=body).execute(num_retries=args.retries)
(c.get('name') == d['name']) and
(c['portable_data_hash'] == d['portable_data_hash'])):
return d
-
+ c['manifest_text'] = dst.collections().get(
+ uuid=dstcol['items'][0]['uuid']
+ ).execute(num_retries=args.retries)['manifest_text']
return create_collection_from(c, src, dst, args)
# Fetch the collection's manifest.
else:
progress_writer = None
- for line in manifest.splitlines(True):
+ for line in manifest.splitlines():
words = line.split()
- dst_manifest_line = words[0]
+ dst_manifest += words[0]
for word in words[1:]:
try:
loc = arvados.KeepLocator(word)
- blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data)
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
- dst_manifest_line += ' ' + dst_locators[blockhash]
except ValueError:
# If 'word' can't be parsed as a locator,
# presume it's a filename.
- dst_manifest_line += ' ' + word
- dst_manifest += dst_manifest_line
- if line.endswith("\n"):
- dst_manifest += "\n"
+ dst_manifest += ' ' + word
+ continue
+ blockhash = loc.md5sum
+ # copy this block if we haven't seen it before
+ # (otherwise, just reuse the existing dst_locator)
+ if blockhash not in dst_locators:
+ logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ data = src_keep.get(word)
+ dst_locator = dst_keep.put(data)
+ dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
+ dst_manifest += ' ' + dst_locators[blockhash]
+ dst_manifest += "\n"
if progress_writer:
progress_writer.report(obj_uuid, bytes_written, bytes_expected)
# Copy the manifest and save the collection.
logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
- dst_keep.put(dst_manifest.encode('utf-8'))
c['manifest_text'] = dst_manifest
return create_collection_from(c, src, dst, args)
# All commits will be copied to a destination branch named for the
# source repository URL.
#
-# Because users cannot create their own repositories, the
-# destination repository must already exist.
+# The destination repository must already exist.
#
# The user running this command must be authenticated
# to both repositories.
dst_git_push_url = r['items'][0]['push_url']
logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
- # script_version is the "script_version" parameter from the source
- # component or job. It is used here to tie the destination branch
- # to the commit that was used on the source. If no script_version
- # was supplied in the component or job, it is a mistake in the pipeline,
- # but for the purposes of copying the repository, default to "master".
- #
- if not script_version:
- script_version = "master"
-
dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
- # Copy git commits from src repo to dst repo (but only if
- # we have not already copied this repo in this session).
- #
- if src_git_repo in local_repo_dir:
- logger.debug('already copied src repo %s, skipping', src_git_repo)
- else:
- tmprepo = tempfile.mkdtemp()
- local_repo_dir[src_git_repo] = tmprepo
+ # Copy git commits from src repo to dst repo.
+ if src_git_repo not in local_repo_dir:
+ local_repo_dir[src_git_repo] = tempfile.mkdtemp()
arvados.util.run_command(
- ["git", "clone", "--bare", src_git_url, tmprepo],
- cwd=os.path.dirname(tmprepo))
+ ["git", "clone", "--bare", src_git_url,
+ local_repo_dir[src_git_repo]],
+ cwd=os.path.dirname(local_repo_dir[src_git_repo]))
arvados.util.run_command(
- ["git", "branch", dst_branch, script_version],
- cwd=tmprepo)
- arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
- arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
-
+ ["git", "remote", "add", "dst", dst_git_push_url],
+ cwd=local_repo_dir[src_git_repo])
+ arvados.util.run_command(
+ ["git", "branch", dst_branch, script_version],
+ cwd=local_repo_dir[src_git_repo])
+ arvados.util.run_command(["git", "push", "dst", dst_branch],
+ cwd=local_repo_dir[src_git_repo])
def copy_docker_images(pipeline, src, dst, args):
"""Copy any docker images named in the pipeline components'