1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a pipeline instance, arv-copy copies the pipeline
12 # template, input collection, docker images, git repositories). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51 logger = logging.getLogger('arvados.arv-copy')
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
68 # The owner_uuid of the object being copied
72 copy_opts = argparse.ArgumentParser(add_help=False)
74 copy_opts.add_argument(
75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76 help='Print version and exit.')
77 copy_opts.add_argument(
78 '-v', '--verbose', dest='verbose', action='store_true',
79 help='Verbose output.')
80 copy_opts.add_argument(
81 '--progress', dest='progress', action='store_true',
82 help='Report progress on copying collections. (default)')
83 copy_opts.add_argument(
84 '--no-progress', dest='progress', action='store_false',
85 help='Do not report progress on copying collections.')
86 copy_opts.add_argument(
87 '-f', '--force', dest='force', action='store_true',
88 help='Perform copy even if the object appears to exist at the remote destination.')
89 copy_opts.add_argument(
90 '--force-filters', action='store_true', default=False,
91 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
92 copy_opts.add_argument(
93 '--src', dest='source_arvados', required=True,
94 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--dst', dest='destination_arvados', required=True,
97 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
98 copy_opts.add_argument(
99 '--recursive', dest='recursive', action='store_true',
100 help='Recursively copy any dependencies for this object. (default)')
101 copy_opts.add_argument(
102 '--no-recursive', dest='recursive', action='store_false',
103 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
104 copy_opts.add_argument(
105 '--dst-git-repo', dest='dst_git_repo',
106 help='The name of the destination git repository. Required when copying a pipeline recursively.')
107 copy_opts.add_argument(
108 '--project-uuid', dest='project_uuid',
109 help='The UUID of the project at the destination to which the pipeline should be copied.')
110 copy_opts.add_argument(
111 '--allow-git-http-src', action="store_true",
112 help='Allow cloning git repositories over insecure http')
113 copy_opts.add_argument(
114 '--allow-git-http-dst', action="store_true",
115 help='Allow pushing git repositories over insecure http')
117 copy_opts.add_argument(
119 help='The UUID of the object to be copied.')
120 copy_opts.set_defaults(progress=True)
121 copy_opts.set_defaults(recursive=True)
123 parser = argparse.ArgumentParser(
124 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
125 parents=[copy_opts, arv_cmd.retry_opt])
126 args = parser.parse_args()
129 logger.setLevel(logging.DEBUG)
131 logger.setLevel(logging.INFO)
133 # Create API clients for the source and destination instances
134 src_arv = api_for_instance(args.source_arvados)
135 dst_arv = api_for_instance(args.destination_arvados)
137 if not args.project_uuid:
138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
140 # Identify the kind of object we have been given, and begin copying.
141 t = uuid_type(src_arv, args.object_uuid)
142 if t == 'Collection':
143 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
144 result = copy_collection(args.object_uuid,
147 elif t == 'Workflow':
148 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
149 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
151 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153 # Clean up any outstanding temp git repositories.
154 for d in listvalues(local_repo_dir):
155 shutil.rmtree(d, ignore_errors=True)
157 # If no exception was thrown and the response does not have an
158 # error_token field, presume success
159 if 'error_token' in result or 'uuid' not in result:
160 logger.error("API server returned an error result: {}".format(result))
164 logger.info("Success: created copy with uuid {}".format(result['uuid']))
167 def set_src_owner_uuid(resource, uuid, args):
168 global src_owner_uuid
169 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
170 src_owner_uuid = c.get("owner_uuid")
172 # api_for_instance(instance_name)
174 # Creates an API client for the Arvados instance identified by
177 # If instance_name contains a slash, it is presumed to be a path
178 # (either local or absolute) to a file with Arvados configuration
181 # Otherwise, it is presumed to be the name of a file in
182 # $HOME/.config/arvados/instance_name.conf
184 def api_for_instance(instance_name):
185 if '/' in instance_name:
186 config_file = instance_name
188 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
191 cfg = arvados.config.load(config_file)
192 except (IOError, OSError) as e:
193 abort(("Could not open config file {}: {}\n" +
194 "You must make sure that your configuration tokens\n" +
195 "for Arvados instance {} are in {} and that this\n" +
196 "file is readable.").format(
197 config_file, e, instance_name, config_file))
199 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
201 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
202 ['1', 't', 'true', 'y', 'yes']))
203 client = arvados.api('v1',
204 host=cfg['ARVADOS_API_HOST'],
205 token=cfg['ARVADOS_API_TOKEN'],
206 insecure=api_is_insecure,
207 model=OrderedJsonModel())
209 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
212 # Check if git is available
213 def check_git_availability():
215 arvados.util.run_command(['git', '--help'])
217 abort('git command is not available. Please ensure git is installed.')
220 def filter_iter(arg):
221 """Iterate a filter string-or-list.
223 Pass in a filter field that can either be a string or list.
224 This will iterate elements as if the field had been written as a list.
226 if isinstance(arg, basestring):
231 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
232 """Update a single repository filter in-place for the destination.
234 If the filter checks that the repository is src_repository, it is
235 updated to check that the repository is dst_repository. If it does
236 anything else, this function raises ValueError.
238 if src_repository is None:
239 raise ValueError("component does not specify a source repository")
240 elif dst_repository is None:
241 raise ValueError("no destination repository specified to update repository filter")
242 elif repo_filter[1:] == ['=', src_repository]:
243 repo_filter[2] = dst_repository
244 elif repo_filter[1:] == ['in', [src_repository]]:
245 repo_filter[2] = [dst_repository]
247 raise ValueError("repository filter is not a simple source match")
249 def migrate_script_version_filter(version_filter):
250 """Update a single script_version filter in-place for the destination.
252 Currently this function checks that all the filter operands are Git
253 commit hashes. If they're not, it raises ValueError to indicate that
254 the filter is not portable. It could be extended to make other
255 transformations in the future.
257 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
258 raise ValueError("script_version filter is not limited to commit hashes")
260 def attr_filtered(filter_, *attr_names):
261 """Return True if filter_ applies to any of attr_names, else False."""
262 return any((name == 'any') or (name in attr_names)
263 for name in filter_iter(filter_[0]))
265 @contextlib.contextmanager
266 def exception_handler(handler, *exc_types):
267 """If any exc_types are raised in the block, call handler on the exception."""
270 except exc_types as error:
273 def migrate_components_filters(template_components, dst_git_repo):
274 """Update template component filters in-place for the destination.
276 template_components is a dictionary of components in a pipeline template.
277 This method walks over each component's filters, and updates them to have
278 identical semantics on the destination cluster. It returns a list of
279 error strings that describe what filters could not be updated safely.
281 dst_git_repo is the name of the destination Git repository, which can
282 be None if that is not known.
285 for cname, cspec in template_components.items():
286 def add_error(errmsg):
287 errors.append("{}: {}".format(cname, errmsg))
288 if not isinstance(cspec, dict):
289 add_error("value is not a component definition")
291 src_repository = cspec.get('repository')
292 filters = cspec.get('filters', [])
293 if not isinstance(filters, list):
294 add_error("filters are not a list")
296 for cfilter in filters:
297 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
298 add_error("malformed filter {!r}".format(cfilter))
300 if attr_filtered(cfilter, 'repository'):
301 with exception_handler(add_error, ValueError):
302 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
303 if attr_filtered(cfilter, 'script_version'):
304 with exception_handler(add_error, ValueError):
305 migrate_script_version_filter(cfilter)
309 # copy_workflow(wf_uuid, src, dst, args)
311 # Copies a workflow identified by wf_uuid from src to dst.
313 # If args.recursive is True, also copy any collections
314 # referenced in the workflow definition yaml.
316 # The owner_uuid of the new workflow is set to any given
317 # project_uuid or the user who copied the template.
319 # Returns the copied workflow object.
321 def copy_workflow(wf_uuid, src, dst, args):
322 # fetch the workflow from the source instance
323 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
325 # copy collections and docker images
327 wf_def = yaml.safe_load(wf["definition"])
328 if wf_def is not None:
331 graph = wf_def.get('$graph', None)
332 if graph is not None:
333 workflow_collections(graph, locations, docker_images)
335 workflow_collections(wf_def, locations, docker_images)
338 copy_collections(locations, src, dst, args)
340 for image in docker_images:
341 copy_docker_image(image, docker_images[image], src, dst, args)
343 # copy the workflow itself
345 wf['owner_uuid'] = args.project_uuid
346 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
348 def workflow_collections(obj, locations, docker_images):
349 if isinstance(obj, dict):
350 loc = obj.get('location', None)
352 if loc.startswith("keep:"):
353 locations.append(loc[5:])
355 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
356 if docker_image is not None:
357 ds = docker_image.split(":", 1)
358 tag = ds[1] if len(ds)==2 else 'latest'
359 docker_images[ds[0]] = tag
362 workflow_collections(obj[x], locations, docker_images)
363 elif isinstance(obj, list):
365 workflow_collections(x, locations, docker_images)
367 # copy_collections(obj, src, dst, args)
369 # Recursively copies all collections referenced by 'obj' from src
370 # to dst. obj may be a dict or a list, in which case we run
371 # copy_collections on every value it contains. If it is a string,
372 # search it for any substring that matches a collection hash or uuid
373 # (this will find hidden references to collections like
374 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
376 # Returns a copy of obj with any old collection uuids replaced by
379 def copy_collections(obj, src, dst, args):
381 def copy_collection_fn(collection_match):
382 """Helper function for regex substitution: copies a single collection,
383 identified by the collection_match MatchObject, to the
384 destination. Returns the destination collection uuid (or the
385 portable data hash if that's what src_id is).
388 src_id = collection_match.group(0)
389 if src_id not in collections_copied:
390 dst_col = copy_collection(src_id, src, dst, args)
391 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
392 collections_copied[src_id] = src_id
394 collections_copied[src_id] = dst_col['uuid']
395 return collections_copied[src_id]
397 if isinstance(obj, basestring):
398 # Copy any collections identified in this string to dst, replacing
399 # them with the dst uuids as necessary.
400 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
401 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
403 elif isinstance(obj, dict):
404 return type(obj)((v, copy_collections(obj[v], src, dst, args))
406 elif isinstance(obj, list):
407 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
410 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
411 """Copy a job's script to the destination repository, and update its record.
413 Given a jobspec dictionary, this function finds the referenced script from
414 src and copies it to dst and dst_repo. It also updates jobspec in place to
415 refer to names on the destination.
417 repo = jobspec.get('repository')
420 # script_version is the "script_version" parameter from the source
421 # component or job. If no script_version was supplied in the
422 # component or job, it is a mistake in the pipeline, but for the
423 # purposes of copying the repository, default to "master".
424 script_version = jobspec.get('script_version') or 'master'
425 script_key = (repo, script_version)
426 if script_key not in scripts_copied:
427 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
428 scripts_copied.add(script_key)
429 jobspec['repository'] = dst_repo
430 repo_dir = local_repo_dir[repo]
431 for version_key in ['script_version', 'supplied_script_version']:
432 if version_key in jobspec:
433 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
435 # copy_git_repos(p, src, dst, dst_repo, args)
437 # Copies all git repositories referenced by pipeline instance or
438 # template 'p' from src to dst.
440 # For each component c in the pipeline:
441 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
442 # * Rename script versions:
443 # * c['script_version']
444 # * c['job']['script_version']
445 # * c['job']['supplied_script_version']
446 # to the commit hashes they resolve to, since any symbolic
447 # names (tags, branches) are not preserved in the destination repo.
449 # The pipeline object is updated in place with the new repository
450 # names. The return value is undefined.
452 def copy_git_repos(p, src, dst, dst_repo, args):
453 for component in p['components'].values():
454 migrate_jobspec(component, src, dst, dst_repo, args)
455 if 'job' in component:
456 migrate_jobspec(component['job'], src, dst, dst_repo, args)
458 def total_collection_size(manifest_text):
459 """Return the total number of bytes in this collection (excluding
460 duplicate blocks)."""
464 for line in manifest_text.splitlines():
466 for word in words[1:]:
468 loc = arvados.KeepLocator(word)
470 continue # this word isn't a locator, skip it
471 if loc.md5sum not in locators_seen:
472 locators_seen[loc.md5sum] = True
473 total_bytes += loc.size
477 def create_collection_from(c, src, dst, args):
478 """Create a new collection record on dst, and copy Docker metadata if
481 collection_uuid = c['uuid']
483 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
487 body['name'] = "copied from " + collection_uuid
489 body['owner_uuid'] = args.project_uuid
491 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
493 # Create docker_image_repo+tag and docker_image_hash links
494 # at the destination.
495 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
496 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
498 for src_link in docker_links:
499 body = {key: src_link[key]
500 for key in ['link_class', 'name', 'properties']}
501 body['head_uuid'] = dst_collection['uuid']
502 body['owner_uuid'] = args.project_uuid
504 lk = dst.links().create(body=body).execute(num_retries=args.retries)
505 logger.debug('created dst link {}'.format(lk))
507 return dst_collection
509 # copy_collection(obj_uuid, src, dst, args)
511 # Copies the collection identified by obj_uuid from src to dst.
512 # Returns the collection object created at dst.
514 # If args.progress is True, produce a human-friendly progress
517 # If a collection with the desired portable_data_hash already
518 # exists at dst, and args.force is False, copy_collection returns
519 # the existing collection without copying any blocks. Otherwise
520 # (if no collection exists or if args.force is True)
521 # copy_collection copies all of the collection data blocks from src
524 # For this application, it is critical to preserve the
525 # collection's manifest hash, which is not guaranteed with the
526 # arvados.CollectionReader and arvados.CollectionWriter classes.
527 # Copying each block in the collection manually, followed by
528 # the manifest block, ensures that the collection's manifest
529 # hash will not change.
531 def copy_collection(obj_uuid, src, dst, args):
532 if arvados.util.keep_locator_pattern.match(obj_uuid):
533 # If the obj_uuid is a portable data hash, it might not be
534 # uniquely identified with a particular collection. As a
535 # result, it is ambiguous as to what name to use for the copy.
536 # Apply some heuristics to pick which collection to get the
538 srccol = src.collections().list(
539 filters=[['portable_data_hash', '=', obj_uuid]],
540 order="created_at asc"
541 ).execute(num_retries=args.retries)
543 items = srccol.get("items")
546 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
552 # There's only one collection with the PDH, so use that.
555 # See if there is a collection that's in the same project
556 # as the root item (usually a pipeline) being copied.
558 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
562 # Didn't find any collections located in the same project, so
563 # pick the oldest collection that has a name assigned to it.
569 # None of the collections have names (?!), so just pick the
573 # list() doesn't return manifest text (and we don't want it to,
574 # because we don't need the same maninfest text sent to us 50
575 # times) so go and retrieve the collection object directly
576 # which will include the manifest text.
577 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
579 # Assume this is an actual collection uuid, so fetch it directly.
580 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
582 # If a collection with this hash already exists at the
583 # destination, and 'force' is not true, just return that
586 if 'portable_data_hash' in c:
587 colhash = c['portable_data_hash']
590 dstcol = dst.collections().list(
591 filters=[['portable_data_hash', '=', colhash]]
592 ).execute(num_retries=args.retries)
593 if dstcol['items_available'] > 0:
594 for d in dstcol['items']:
595 if ((args.project_uuid == d['owner_uuid']) and
596 (c.get('name') == d['name']) and
597 (c['portable_data_hash'] == d['portable_data_hash'])):
599 c['manifest_text'] = dst.collections().get(
600 uuid=dstcol['items'][0]['uuid']
601 ).execute(num_retries=args.retries)['manifest_text']
602 return create_collection_from(c, src, dst, args)
604 # Fetch the collection's manifest.
605 manifest = c['manifest_text']
606 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
608 # Copy each block from src_keep to dst_keep.
609 # Use the newly signed locators returned from dst_keep to build
610 # a new manifest as we go.
611 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
612 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
616 bytes_expected = total_collection_size(manifest)
618 progress_writer = ProgressWriter(human_progress)
620 progress_writer = None
622 for line in manifest.splitlines():
624 dst_manifest += words[0]
625 for word in words[1:]:
627 loc = arvados.KeepLocator(word)
629 # If 'word' can't be parsed as a locator,
630 # presume it's a filename.
631 dst_manifest += ' ' + word
633 blockhash = loc.md5sum
634 # copy this block if we haven't seen it before
635 # (otherwise, just reuse the existing dst_locator)
636 if blockhash not in dst_locators:
637 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
639 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
640 data = src_keep.get(word)
641 dst_locator = dst_keep.put(data)
642 dst_locators[blockhash] = dst_locator
643 bytes_written += loc.size
644 dst_manifest += ' ' + dst_locators[blockhash]
648 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
649 progress_writer.finish()
651 # Copy the manifest and save the collection.
652 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
654 c['manifest_text'] = dst_manifest
655 return create_collection_from(c, src, dst, args)
657 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
658 r = api.repositories().list(
659 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
660 if r['items_available'] != 1:
661 raise Exception('cannot identify repo {}; {} repos found'
662 .format(repo_name, r['items_available']))
664 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
665 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
666 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
668 priority = https_url + other_url + http_url
673 if url.startswith("http"):
674 u = urllib.parse.urlsplit(url)
675 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
676 git_config = ["-c", "credential.%s/.username=none" % baseurl,
677 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
682 logger.debug("trying %s", url)
683 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
684 env={"HOME": os.environ["HOME"],
685 "ARVADOS_API_TOKEN": api.api_token,
686 "GIT_ASKPASS": "/bin/false"})
687 except arvados.errors.CommandFailedError:
694 raise Exception('Cannot access git repository, tried {}'
697 if git_url.startswith("http:"):
698 if allow_insecure_http:
699 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
701 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
703 return (git_url, git_config)
706 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
708 # Copies commits from git repository 'src_git_repo' on Arvados
709 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
710 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
713 # All commits will be copied to a destination branch named for the
714 # source repository URL.
716 # The destination repository must already exist.
718 # The user running this command must be authenticated
719 # to both repositories.
721 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
722 # Identify the fetch and push URLs for the git repositories.
724 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
725 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
727 logger.debug('src_git_url: {}'.format(src_git_url))
728 logger.debug('dst_git_url: {}'.format(dst_git_url))
730 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
732 # Copy git commits from src repo to dst repo.
733 if src_git_repo not in local_repo_dir:
734 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
735 arvados.util.run_command(
736 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
737 local_repo_dir[src_git_repo]],
738 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
739 env={"HOME": os.environ["HOME"],
740 "ARVADOS_API_TOKEN": src.api_token,
741 "GIT_ASKPASS": "/bin/false"})
742 arvados.util.run_command(
743 ["git", "remote", "add", "dst", dst_git_url],
744 cwd=local_repo_dir[src_git_repo])
745 arvados.util.run_command(
746 ["git", "branch", dst_branch, script_version],
747 cwd=local_repo_dir[src_git_repo])
748 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
749 cwd=local_repo_dir[src_git_repo],
750 env={"HOME": os.environ["HOME"],
751 "ARVADOS_API_TOKEN": dst.api_token,
752 "GIT_ASKPASS": "/bin/false"})
754 def copy_docker_images(pipeline, src, dst, args):
755 """Copy any docker images named in the pipeline components'
756 runtime_constraints field from src to dst."""
758 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
759 for c_name, c_info in pipeline['components'].items():
760 if ('runtime_constraints' in c_info and
761 'docker_image' in c_info['runtime_constraints']):
763 c_info['runtime_constraints']['docker_image'],
764 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
768 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
769 """Copy the docker image identified by docker_image and
770 docker_image_tag from src to dst. Create appropriate
771 docker_image_repo+tag and docker_image_hash links at dst.
775 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
777 # Find the link identifying this docker image.
778 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
779 src, args.retries, docker_image, docker_image_tag)
780 if docker_image_list:
781 image_uuid, image_info = docker_image_list[0]
782 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
784 # Copy the collection it refers to.
785 dst_image_col = copy_collection(image_uuid, src, dst, args)
786 elif arvados.util.keep_locator_pattern.match(docker_image):
787 dst_image_col = copy_collection(docker_image, src, dst, args)
789 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
791 # git_rev_parse(rev, repo)
793 # Returns the 40-character commit hash corresponding to 'rev' in
794 # git repository 'repo' (which must be the path of a local git
797 def git_rev_parse(rev, repo):
798 gitout, giterr = arvados.util.run_command(
799 ['git', 'rev-parse', rev], cwd=repo)
800 return gitout.strip()
802 # uuid_type(api, object_uuid)
804 # Returns the name of the class that object_uuid belongs to, based on
805 # the second field of the uuid. This function consults the api's
806 # schema to identify the object class.
808 # It returns a string such as 'Collection', 'PipelineInstance', etc.
810 # Special case: if handed a Keep locator hash, return 'Collection'.
812 def uuid_type(api, object_uuid):
813 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
815 p = object_uuid.split('-')
818 for k in api._schema.schemas:
819 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
820 if type_prefix == obj_class:
824 def abort(msg, code=1):
825 logger.info("arv-copy: %s", msg)
829 # Code for reporting on the progress of a collection upload.
830 # Stolen from arvados.commands.put.ArvPutCollectionWriter
831 # TODO(twp): figure out how to refactor into a shared library
832 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
835 def machine_progress(obj_uuid, bytes_written, bytes_expected):
836 return "{} {}: {} {} written {} total\n".format(
841 -1 if (bytes_expected is None) else bytes_expected)
843 def human_progress(obj_uuid, bytes_written, bytes_expected):
845 return "\r{}: {}M / {}M {:.1%} ".format(
847 bytes_written >> 20, bytes_expected >> 20,
848 float(bytes_written) / bytes_expected)
850 return "\r{}: {} ".format(obj_uuid, bytes_written)
852 class ProgressWriter(object):
853 _progress_func = None
856 def __init__(self, progress_func):
857 self._progress_func = progress_func
859 def report(self, obj_uuid, bytes_written, bytes_expected):
860 if self._progress_func is not None:
862 self._progress_func(obj_uuid, bytes_written, bytes_expected))
865 self.outfile.write("\n")
867 if __name__ == '__main__':