1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a pipeline instance, arv-copy copies the pipeline
12 # template, input collection, docker images, git repositories). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51 logger = logging.getLogger('arvados.arv-copy')
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
68 # The owner_uuid of the object being copied
72 copy_opts = argparse.ArgumentParser(add_help=False)
74 copy_opts.add_argument(
75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76 help='Print version and exit.')
77 copy_opts.add_argument(
78 '-v', '--verbose', dest='verbose', action='store_true',
79 help='Verbose output.')
80 copy_opts.add_argument(
81 '--progress', dest='progress', action='store_true',
82 help='Report progress on copying collections. (default)')
83 copy_opts.add_argument(
84 '--no-progress', dest='progress', action='store_false',
85 help='Do not report progress on copying collections.')
86 copy_opts.add_argument(
87 '-f', '--force', dest='force', action='store_true',
88 help='Perform copy even if the object appears to exist at the remote destination.')
89 copy_opts.add_argument(
90 '--force-filters', action='store_true', default=False,
91 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
92 copy_opts.add_argument(
93 '--src', dest='source_arvados', required=True,
94 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--dst', dest='destination_arvados', required=True,
97 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
98 copy_opts.add_argument(
99 '--recursive', dest='recursive', action='store_true',
100 help='Recursively copy any dependencies for this object. (default)')
101 copy_opts.add_argument(
102 '--no-recursive', dest='recursive', action='store_false',
103 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
104 copy_opts.add_argument(
105 '--dst-git-repo', dest='dst_git_repo',
106 help='The name of the destination git repository. Required when copying a pipeline recursively.')
107 copy_opts.add_argument(
108 '--project-uuid', dest='project_uuid',
109 help='The UUID of the project at the destination to which the pipeline should be copied.')
110 copy_opts.add_argument(
111 '--allow-git-http-src', action="store_true",
112 help='Allow cloning git repositories over insecure http')
113 copy_opts.add_argument(
114 '--allow-git-http-dst', action="store_true",
115 help='Allow pushing git repositories over insecure http')
117 copy_opts.add_argument(
119 help='The UUID of the object to be copied.')
120 copy_opts.set_defaults(progress=True)
121 copy_opts.set_defaults(recursive=True)
123 parser = argparse.ArgumentParser(
124 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
125 parents=[copy_opts, arv_cmd.retry_opt])
126 args = parser.parse_args()
129 logger.setLevel(logging.DEBUG)
131 logger.setLevel(logging.INFO)
133 # Create API clients for the source and destination instances
134 src_arv = api_for_instance(args.source_arvados)
135 dst_arv = api_for_instance(args.destination_arvados)
137 if not args.project_uuid:
138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
140 # Identify the kind of object we have been given, and begin copying.
141 t = uuid_type(src_arv, args.object_uuid)
142 if t == 'Collection':
143 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
144 result = copy_collection(args.object_uuid,
147 elif t == 'PipelineInstance':
148 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
149 result = copy_pipeline_instance(args.object_uuid,
152 elif t == 'PipelineTemplate':
153 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
154 result = copy_pipeline_template(args.object_uuid,
155 src_arv, dst_arv, args)
156 elif t == 'Workflow':
157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
160 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
162 # Clean up any outstanding temp git repositories.
163 for d in listvalues(local_repo_dir):
164 shutil.rmtree(d, ignore_errors=True)
166 # If no exception was thrown and the response does not have an
167 # error_token field, presume success
168 if 'error_token' in result or 'uuid' not in result:
169 logger.error("API server returned an error result: {}".format(result))
173 logger.info("Success: created copy with uuid {}".format(result['uuid']))
176 def set_src_owner_uuid(resource, uuid, args):
177 global src_owner_uuid
178 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
179 src_owner_uuid = c.get("owner_uuid")
181 # api_for_instance(instance_name)
183 # Creates an API client for the Arvados instance identified by
186 # If instance_name contains a slash, it is presumed to be a path
187 # (either local or absolute) to a file with Arvados configuration
190 # Otherwise, it is presumed to be the name of a file in
191 # $HOME/.config/arvados/instance_name.conf
193 def api_for_instance(instance_name):
194 if '/' in instance_name:
195 config_file = instance_name
197 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
200 cfg = arvados.config.load(config_file)
201 except (IOError, OSError) as e:
202 abort(("Could not open config file {}: {}\n" +
203 "You must make sure that your configuration tokens\n" +
204 "for Arvados instance {} are in {} and that this\n" +
205 "file is readable.").format(
206 config_file, e, instance_name, config_file))
208 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
210 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
211 ['1', 't', 'true', 'y', 'yes']))
212 client = arvados.api('v1',
213 host=cfg['ARVADOS_API_HOST'],
214 token=cfg['ARVADOS_API_TOKEN'],
215 insecure=api_is_insecure,
216 model=OrderedJsonModel())
218 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
221 # Check if git is available
222 def check_git_availability():
224 arvados.util.run_command(['git', '--help'])
226 abort('git command is not available. Please ensure git is installed.')
228 # copy_pipeline_instance(pi_uuid, src, dst, args)
230 # Copies a pipeline instance identified by pi_uuid from src to dst.
232 # If the args.recursive option is set:
233 # 1. Copies all input collections
234 # * For each component in the pipeline, include all collections
235 # listed as job dependencies for that component)
236 # 2. Copy docker images
237 # 3. Copy git repositories
238 # 4. Copy the pipeline template
240 # The only changes made to the copied pipeline instance are:
241 # 1. The original pipeline instance UUID is preserved in
242 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
243 # 2. The pipeline_template_uuid is changed to the new template uuid.
244 # 3. The owner_uuid of the instance is changed to the user who
247 def copy_pipeline_instance(pi_uuid, src, dst, args):
248 # Fetch the pipeline instance record.
249 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
252 check_git_availability()
254 if not args.dst_git_repo:
255 abort('--dst-git-repo is required when copying a pipeline recursively.')
256 # Copy the pipeline template and save the copied template.
257 if pi.get('pipeline_template_uuid', None):
258 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
261 # Copy input collections, docker images and git repos.
262 pi = copy_collections(pi, src, dst, args)
263 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
264 copy_docker_images(pi, src, dst, args)
266 # Update the fields of the pipeline instance with the copied
268 if pi.get('pipeline_template_uuid', None):
269 pi['pipeline_template_uuid'] = pt['uuid']
273 logger.info("Copying only pipeline instance %s.", pi_uuid)
274 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
276 # Update the pipeline instance properties, and create the new
278 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
279 pi['description'] = "Pipeline copied from {}\n\n{}".format(
281 pi['description'] if pi.get('description', None) else '')
283 pi['owner_uuid'] = args.project_uuid
287 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
290 def filter_iter(arg):
291 """Iterate a filter string-or-list.
293 Pass in a filter field that can either be a string or list.
294 This will iterate elements as if the field had been written as a list.
296 if isinstance(arg, basestring):
301 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
302 """Update a single repository filter in-place for the destination.
304 If the filter checks that the repository is src_repository, it is
305 updated to check that the repository is dst_repository. If it does
306 anything else, this function raises ValueError.
308 if src_repository is None:
309 raise ValueError("component does not specify a source repository")
310 elif dst_repository is None:
311 raise ValueError("no destination repository specified to update repository filter")
312 elif repo_filter[1:] == ['=', src_repository]:
313 repo_filter[2] = dst_repository
314 elif repo_filter[1:] == ['in', [src_repository]]:
315 repo_filter[2] = [dst_repository]
317 raise ValueError("repository filter is not a simple source match")
319 def migrate_script_version_filter(version_filter):
320 """Update a single script_version filter in-place for the destination.
322 Currently this function checks that all the filter operands are Git
323 commit hashes. If they're not, it raises ValueError to indicate that
324 the filter is not portable. It could be extended to make other
325 transformations in the future.
327 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
328 raise ValueError("script_version filter is not limited to commit hashes")
330 def attr_filtered(filter_, *attr_names):
331 """Return True if filter_ applies to any of attr_names, else False."""
332 return any((name == 'any') or (name in attr_names)
333 for name in filter_iter(filter_[0]))
335 @contextlib.contextmanager
336 def exception_handler(handler, *exc_types):
337 """If any exc_types are raised in the block, call handler on the exception."""
340 except exc_types as error:
343 def migrate_components_filters(template_components, dst_git_repo):
344 """Update template component filters in-place for the destination.
346 template_components is a dictionary of components in a pipeline template.
347 This method walks over each component's filters, and updates them to have
348 identical semantics on the destination cluster. It returns a list of
349 error strings that describe what filters could not be updated safely.
351 dst_git_repo is the name of the destination Git repository, which can
352 be None if that is not known.
355 for cname, cspec in template_components.items():
356 def add_error(errmsg):
357 errors.append("{}: {}".format(cname, errmsg))
358 if not isinstance(cspec, dict):
359 add_error("value is not a component definition")
361 src_repository = cspec.get('repository')
362 filters = cspec.get('filters', [])
363 if not isinstance(filters, list):
364 add_error("filters are not a list")
366 for cfilter in filters:
367 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
368 add_error("malformed filter {!r}".format(cfilter))
370 if attr_filtered(cfilter, 'repository'):
371 with exception_handler(add_error, ValueError):
372 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
373 if attr_filtered(cfilter, 'script_version'):
374 with exception_handler(add_error, ValueError):
375 migrate_script_version_filter(cfilter)
378 # copy_pipeline_template(pt_uuid, src, dst, args)
380 # Copies a pipeline template identified by pt_uuid from src to dst.
382 # If args.recursive is True, also copy any collections, docker
383 # images and git repositories that this template references.
385 # The owner_uuid of the new template is changed to that of the user
386 # who copied the template.
388 # Returns the copied pipeline template object.
390 def copy_pipeline_template(pt_uuid, src, dst, args):
391 # fetch the pipeline template from the source instance
392 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
394 if not args.force_filters:
395 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
397 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
398 "\n".join(filter_errors))
401 check_git_availability()
403 if not args.dst_git_repo:
404 abort('--dst-git-repo is required when copying a pipeline recursively.')
405 # Copy input collections, docker images and git repos.
406 pt = copy_collections(pt, src, dst, args)
407 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
408 copy_docker_images(pt, src, dst, args)
410 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
412 pt['description'] if pt.get('description', None) else '')
413 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
416 pt['owner_uuid'] = args.project_uuid
418 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
420 # copy_workflow(wf_uuid, src, dst, args)
422 # Copies a workflow identified by wf_uuid from src to dst.
424 # If args.recursive is True, also copy any collections
425 # referenced in the workflow definition yaml.
427 # The owner_uuid of the new workflow is set to any given
428 # project_uuid or the user who copied the template.
430 # Returns the copied workflow object.
432 def copy_workflow(wf_uuid, src, dst, args):
433 # fetch the workflow from the source instance
434 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
436 # copy collections and docker images
438 wf_def = yaml.safe_load(wf["definition"])
439 if wf_def is not None:
442 graph = wf_def.get('$graph', None)
443 if graph is not None:
444 workflow_collections(graph, locations, docker_images)
446 workflow_collections(wf_def, locations, docker_images)
449 copy_collections(locations, src, dst, args)
451 for image in docker_images:
452 copy_docker_image(image, docker_images[image], src, dst, args)
454 # copy the workflow itself
456 wf['owner_uuid'] = args.project_uuid
457 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
459 def workflow_collections(obj, locations, docker_images):
460 if isinstance(obj, dict):
461 loc = obj.get('location', None)
463 if loc.startswith("keep:"):
464 locations.append(loc[5:])
466 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
467 if docker_image is not None:
468 ds = docker_image.split(":", 1)
469 tag = ds[1] if len(ds)==2 else 'latest'
470 docker_images[ds[0]] = tag
473 workflow_collections(obj[x], locations, docker_images)
474 elif isinstance(obj, list):
476 workflow_collections(x, locations, docker_images)
478 # copy_collections(obj, src, dst, args)
480 # Recursively copies all collections referenced by 'obj' from src
481 # to dst. obj may be a dict or a list, in which case we run
482 # copy_collections on every value it contains. If it is a string,
483 # search it for any substring that matches a collection hash or uuid
484 # (this will find hidden references to collections like
485 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
487 # Returns a copy of obj with any old collection uuids replaced by
490 def copy_collections(obj, src, dst, args):
492 def copy_collection_fn(collection_match):
493 """Helper function for regex substitution: copies a single collection,
494 identified by the collection_match MatchObject, to the
495 destination. Returns the destination collection uuid (or the
496 portable data hash if that's what src_id is).
499 src_id = collection_match.group(0)
500 if src_id not in collections_copied:
501 dst_col = copy_collection(src_id, src, dst, args)
502 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
503 collections_copied[src_id] = src_id
505 collections_copied[src_id] = dst_col['uuid']
506 return collections_copied[src_id]
508 if isinstance(obj, basestring):
509 # Copy any collections identified in this string to dst, replacing
510 # them with the dst uuids as necessary.
511 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
512 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
514 elif isinstance(obj, dict):
515 return type(obj)((v, copy_collections(obj[v], src, dst, args))
517 elif isinstance(obj, list):
518 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
521 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
522 """Copy a job's script to the destination repository, and update its record.
524 Given a jobspec dictionary, this function finds the referenced script from
525 src and copies it to dst and dst_repo. It also updates jobspec in place to
526 refer to names on the destination.
528 repo = jobspec.get('repository')
531 # script_version is the "script_version" parameter from the source
532 # component or job. If no script_version was supplied in the
533 # component or job, it is a mistake in the pipeline, but for the
534 # purposes of copying the repository, default to "master".
535 script_version = jobspec.get('script_version') or 'master'
536 script_key = (repo, script_version)
537 if script_key not in scripts_copied:
538 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
539 scripts_copied.add(script_key)
540 jobspec['repository'] = dst_repo
541 repo_dir = local_repo_dir[repo]
542 for version_key in ['script_version', 'supplied_script_version']:
543 if version_key in jobspec:
544 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
546 # copy_git_repos(p, src, dst, dst_repo, args)
548 # Copies all git repositories referenced by pipeline instance or
549 # template 'p' from src to dst.
551 # For each component c in the pipeline:
552 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
553 # * Rename script versions:
554 # * c['script_version']
555 # * c['job']['script_version']
556 # * c['job']['supplied_script_version']
557 # to the commit hashes they resolve to, since any symbolic
558 # names (tags, branches) are not preserved in the destination repo.
560 # The pipeline object is updated in place with the new repository
561 # names. The return value is undefined.
563 def copy_git_repos(p, src, dst, dst_repo, args):
564 for component in p['components'].values():
565 migrate_jobspec(component, src, dst, dst_repo, args)
566 if 'job' in component:
567 migrate_jobspec(component['job'], src, dst, dst_repo, args)
569 def total_collection_size(manifest_text):
570 """Return the total number of bytes in this collection (excluding
571 duplicate blocks)."""
575 for line in manifest_text.splitlines():
577 for word in words[1:]:
579 loc = arvados.KeepLocator(word)
581 continue # this word isn't a locator, skip it
582 if loc.md5sum not in locators_seen:
583 locators_seen[loc.md5sum] = True
584 total_bytes += loc.size
588 def create_collection_from(c, src, dst, args):
589 """Create a new collection record on dst, and copy Docker metadata if
592 collection_uuid = c['uuid']
596 c['name'] = "copied from " + collection_uuid
598 if 'properties' in c:
601 c['owner_uuid'] = args.project_uuid
603 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
605 # Create docker_image_repo+tag and docker_image_hash links
606 # at the destination.
607 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
608 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
610 for src_link in docker_links:
611 body = {key: src_link[key]
612 for key in ['link_class', 'name', 'properties']}
613 body['head_uuid'] = dst_collection['uuid']
614 body['owner_uuid'] = args.project_uuid
616 lk = dst.links().create(body=body).execute(num_retries=args.retries)
617 logger.debug('created dst link {}'.format(lk))
619 return dst_collection
621 # copy_collection(obj_uuid, src, dst, args)
623 # Copies the collection identified by obj_uuid from src to dst.
624 # Returns the collection object created at dst.
626 # If args.progress is True, produce a human-friendly progress
629 # If a collection with the desired portable_data_hash already
630 # exists at dst, and args.force is False, copy_collection returns
631 # the existing collection without copying any blocks. Otherwise
632 # (if no collection exists or if args.force is True)
633 # copy_collection copies all of the collection data blocks from src
636 # For this application, it is critical to preserve the
637 # collection's manifest hash, which is not guaranteed with the
638 # arvados.CollectionReader and arvados.CollectionWriter classes.
639 # Copying each block in the collection manually, followed by
640 # the manifest block, ensures that the collection's manifest
641 # hash will not change.
643 def copy_collection(obj_uuid, src, dst, args):
644 if arvados.util.keep_locator_pattern.match(obj_uuid):
645 # If the obj_uuid is a portable data hash, it might not be
646 # uniquely identified with a particular collection. As a
647 # result, it is ambiguous as to what name to use for the copy.
648 # Apply some heuristics to pick which collection to get the
650 srccol = src.collections().list(
651 filters=[['portable_data_hash', '=', obj_uuid]],
652 order="created_at asc"
653 ).execute(num_retries=args.retries)
655 items = srccol.get("items")
658 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
664 # There's only one collection with the PDH, so use that.
667 # See if there is a collection that's in the same project
668 # as the root item (usually a pipeline) being copied.
670 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
674 # Didn't find any collections located in the same project, so
675 # pick the oldest collection that has a name assigned to it.
681 # None of the collections have names (?!), so just pick the
685 # list() doesn't return manifest text (and we don't want it to,
686 # because we don't need the same maninfest text sent to us 50
687 # times) so go and retrieve the collection object directly
688 # which will include the manifest text.
689 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
691 # Assume this is an actual collection uuid, so fetch it directly.
692 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
694 # If a collection with this hash already exists at the
695 # destination, and 'force' is not true, just return that
698 if 'portable_data_hash' in c:
699 colhash = c['portable_data_hash']
702 dstcol = dst.collections().list(
703 filters=[['portable_data_hash', '=', colhash]]
704 ).execute(num_retries=args.retries)
705 if dstcol['items_available'] > 0:
706 for d in dstcol['items']:
707 if ((args.project_uuid == d['owner_uuid']) and
708 (c.get('name') == d['name']) and
709 (c['portable_data_hash'] == d['portable_data_hash'])):
711 c['manifest_text'] = dst.collections().get(
712 uuid=dstcol['items'][0]['uuid']
713 ).execute(num_retries=args.retries)['manifest_text']
714 return create_collection_from(c, src, dst, args)
716 # Fetch the collection's manifest.
717 manifest = c['manifest_text']
718 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
720 # Copy each block from src_keep to dst_keep.
721 # Use the newly signed locators returned from dst_keep to build
722 # a new manifest as we go.
723 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
724 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
728 bytes_expected = total_collection_size(manifest)
730 progress_writer = ProgressWriter(human_progress)
732 progress_writer = None
734 for line in manifest.splitlines():
736 dst_manifest += words[0]
737 for word in words[1:]:
739 loc = arvados.KeepLocator(word)
741 # If 'word' can't be parsed as a locator,
742 # presume it's a filename.
743 dst_manifest += ' ' + word
745 blockhash = loc.md5sum
746 # copy this block if we haven't seen it before
747 # (otherwise, just reuse the existing dst_locator)
748 if blockhash not in dst_locators:
749 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
751 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
752 data = src_keep.get(word)
753 dst_locator = dst_keep.put(data)
754 dst_locators[blockhash] = dst_locator
755 bytes_written += loc.size
756 dst_manifest += ' ' + dst_locators[blockhash]
760 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
761 progress_writer.finish()
763 # Copy the manifest and save the collection.
764 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
766 c['manifest_text'] = dst_manifest
767 return create_collection_from(c, src, dst, args)
769 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
770 r = api.repositories().list(
771 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
772 if r['items_available'] != 1:
773 raise Exception('cannot identify repo {}; {} repos found'
774 .format(repo_name, r['items_available']))
776 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
777 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
778 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
780 priority = https_url + other_url + http_url
785 if url.startswith("http"):
786 u = urllib.parse.urlsplit(url)
787 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
788 git_config = ["-c", "credential.%s/.username=none" % baseurl,
789 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
794 logger.debug("trying %s", url)
795 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
796 env={"HOME": os.environ["HOME"],
797 "ARVADOS_API_TOKEN": api.api_token,
798 "GIT_ASKPASS": "/bin/false"})
799 except arvados.errors.CommandFailedError:
806 raise Exception('Cannot access git repository, tried {}'
809 if git_url.startswith("http:"):
810 if allow_insecure_http:
811 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
813 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
815 return (git_url, git_config)
818 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
820 # Copies commits from git repository 'src_git_repo' on Arvados
821 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
822 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
825 # All commits will be copied to a destination branch named for the
826 # source repository URL.
828 # The destination repository must already exist.
830 # The user running this command must be authenticated
831 # to both repositories.
833 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
834 # Identify the fetch and push URLs for the git repositories.
836 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
837 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
839 logger.debug('src_git_url: {}'.format(src_git_url))
840 logger.debug('dst_git_url: {}'.format(dst_git_url))
842 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
844 # Copy git commits from src repo to dst repo.
845 if src_git_repo not in local_repo_dir:
846 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
847 arvados.util.run_command(
848 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
849 local_repo_dir[src_git_repo]],
850 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
851 env={"HOME": os.environ["HOME"],
852 "ARVADOS_API_TOKEN": src.api_token,
853 "GIT_ASKPASS": "/bin/false"})
854 arvados.util.run_command(
855 ["git", "remote", "add", "dst", dst_git_url],
856 cwd=local_repo_dir[src_git_repo])
857 arvados.util.run_command(
858 ["git", "branch", dst_branch, script_version],
859 cwd=local_repo_dir[src_git_repo])
860 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
861 cwd=local_repo_dir[src_git_repo],
862 env={"HOME": os.environ["HOME"],
863 "ARVADOS_API_TOKEN": dst.api_token,
864 "GIT_ASKPASS": "/bin/false"})
866 def copy_docker_images(pipeline, src, dst, args):
867 """Copy any docker images named in the pipeline components'
868 runtime_constraints field from src to dst."""
870 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
871 for c_name, c_info in pipeline['components'].items():
872 if ('runtime_constraints' in c_info and
873 'docker_image' in c_info['runtime_constraints']):
875 c_info['runtime_constraints']['docker_image'],
876 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
880 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
881 """Copy the docker image identified by docker_image and
882 docker_image_tag from src to dst. Create appropriate
883 docker_image_repo+tag and docker_image_hash links at dst.
887 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
889 # Find the link identifying this docker image.
890 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
891 src, args.retries, docker_image, docker_image_tag)
892 if docker_image_list:
893 image_uuid, image_info = docker_image_list[0]
894 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
896 # Copy the collection it refers to.
897 dst_image_col = copy_collection(image_uuid, src, dst, args)
898 elif arvados.util.keep_locator_pattern.match(docker_image):
899 dst_image_col = copy_collection(docker_image, src, dst, args)
901 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
903 # git_rev_parse(rev, repo)
905 # Returns the 40-character commit hash corresponding to 'rev' in
906 # git repository 'repo' (which must be the path of a local git
909 def git_rev_parse(rev, repo):
910 gitout, giterr = arvados.util.run_command(
911 ['git', 'rev-parse', rev], cwd=repo)
912 return gitout.strip()
914 # uuid_type(api, object_uuid)
916 # Returns the name of the class that object_uuid belongs to, based on
917 # the second field of the uuid. This function consults the api's
918 # schema to identify the object class.
920 # It returns a string such as 'Collection', 'PipelineInstance', etc.
922 # Special case: if handed a Keep locator hash, return 'Collection'.
924 def uuid_type(api, object_uuid):
925 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
927 p = object_uuid.split('-')
930 for k in api._schema.schemas:
931 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
932 if type_prefix == obj_class:
936 def abort(msg, code=1):
937 logger.info("arv-copy: %s", msg)
941 # Code for reporting on the progress of a collection upload.
942 # Stolen from arvados.commands.put.ArvPutCollectionWriter
943 # TODO(twp): figure out how to refactor into a shared library
944 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
947 def machine_progress(obj_uuid, bytes_written, bytes_expected):
948 return "{} {}: {} {} written {} total\n".format(
953 -1 if (bytes_expected is None) else bytes_expected)
955 def human_progress(obj_uuid, bytes_written, bytes_expected):
957 return "\r{}: {}M / {}M {:.1%} ".format(
959 bytes_written >> 20, bytes_expected >> 20,
960 float(bytes_written) / bytes_expected)
962 return "\r{}: {} ".format(obj_uuid, bytes_written)
964 class ProgressWriter(object):
965 _progress_func = None
968 def __init__(self, progress_func):
969 self._progress_func = progress_func
971 def report(self, obj_uuid, bytes_written, bytes_expected):
972 if self._progress_func is not None:
974 self._progress_func(obj_uuid, bytes_written, bytes_expected))
977 self.outfile.write("\n")
979 if __name__ == '__main__':