1 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
3 # Copies an object from Arvados instance src to instance dst.
5 # By default, arv-copy recursively copies any dependent objects
6 # necessary to make the object functional in the new instance
7 # (e.g. for a pipeline instance, arv-copy copies the pipeline
8 # template, input collection, docker images, git repositories). If
9 # --no-recursive is given, arv-copy copies only the single record
10 # identified by object-uuid.
12 # The user must have files $HOME/.config/arvados/{src}.conf and
13 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
14 # instances src and dst. If either of these files is not found,
15 # arv-copy will issue an error.
17 from __future__ import division
18 from future import standard_library
19 standard_library.install_aliases()
20 from past.builtins import basestring
21 from builtins import object
37 import arvados.commands._util as arv_cmd
38 import arvados.commands.keepdocker
39 import ruamel.yaml as yaml
41 from arvados.api import OrderedJsonModel
42 from arvados._version import __version__
44 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
46 logger = logging.getLogger('arvados.arv-copy')
48 # local_repo_dir records which git repositories from the Arvados source
49 # instance have been checked out locally during this run, and to which
51 # e.g. if repository 'twp' from src_arv has been cloned into
52 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
56 # List of collections that have been copied in this session, and their
57 # destination collection UUIDs.
58 collections_copied = {}
60 # Set of (repository, script_version) two-tuples of commits copied in git.
61 scripts_copied = set()
63 # The owner_uuid of the object being copied
67 copy_opts = argparse.ArgumentParser(add_help=False)
69 copy_opts.add_argument(
70 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
71 help='Print version and exit.')
72 copy_opts.add_argument(
73 '-v', '--verbose', dest='verbose', action='store_true',
74 help='Verbose output.')
75 copy_opts.add_argument(
76 '--progress', dest='progress', action='store_true',
77 help='Report progress on copying collections. (default)')
78 copy_opts.add_argument(
79 '--no-progress', dest='progress', action='store_false',
80 help='Do not report progress on copying collections.')
81 copy_opts.add_argument(
82 '-f', '--force', dest='force', action='store_true',
83 help='Perform copy even if the object appears to exist at the remote destination.')
84 copy_opts.add_argument(
85 '--force-filters', action='store_true', default=False,
86 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
87 copy_opts.add_argument(
88 '--src', dest='source_arvados', required=True,
89 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
90 copy_opts.add_argument(
91 '--dst', dest='destination_arvados', required=True,
92 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93 copy_opts.add_argument(
94 '--recursive', dest='recursive', action='store_true',
95 help='Recursively copy any dependencies for this object. (default)')
96 copy_opts.add_argument(
97 '--no-recursive', dest='recursive', action='store_false',
98 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
99 copy_opts.add_argument(
100 '--dst-git-repo', dest='dst_git_repo',
101 help='The name of the destination git repository. Required when copying a pipeline recursively.')
102 copy_opts.add_argument(
103 '--project-uuid', dest='project_uuid',
104 help='The UUID of the project at the destination to which the pipeline should be copied.')
105 copy_opts.add_argument(
106 '--allow-git-http-src', action="store_true",
107 help='Allow cloning git repositories over insecure http')
108 copy_opts.add_argument(
109 '--allow-git-http-dst', action="store_true",
110 help='Allow pushing git repositories over insecure http')
112 copy_opts.add_argument(
114 help='The UUID of the object to be copied.')
115 copy_opts.set_defaults(progress=True)
116 copy_opts.set_defaults(recursive=True)
118 parser = argparse.ArgumentParser(
119 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
120 parents=[copy_opts, arv_cmd.retry_opt])
121 args = parser.parse_args()
124 logger.setLevel(logging.DEBUG)
126 logger.setLevel(logging.INFO)
128 # Create API clients for the source and destination instances
129 src_arv = api_for_instance(args.source_arvados)
130 dst_arv = api_for_instance(args.destination_arvados)
132 if not args.project_uuid:
133 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
135 # Identify the kind of object we have been given, and begin copying.
136 t = uuid_type(src_arv, args.object_uuid)
137 if t == 'Collection':
138 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
139 result = copy_collection(args.object_uuid,
142 elif t == 'PipelineInstance':
143 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
144 result = copy_pipeline_instance(args.object_uuid,
147 elif t == 'PipelineTemplate':
148 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
149 result = copy_pipeline_template(args.object_uuid,
150 src_arv, dst_arv, args)
151 elif t == 'Workflow':
152 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
153 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
155 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
157 # Clean up any outstanding temp git repositories.
158 for d in list(local_repo_dir.values()):
159 shutil.rmtree(d, ignore_errors=True)
161 # If no exception was thrown and the response does not have an
162 # error_token field, presume success
163 if 'error_token' in result or 'uuid' not in result:
164 logger.error("API server returned an error result: {}".format(result))
168 logger.info("Success: created copy with uuid {}".format(result['uuid']))
171 def set_src_owner_uuid(resource, uuid, args):
172 global src_owner_uuid
173 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
174 src_owner_uuid = c.get("owner_uuid")
176 # api_for_instance(instance_name)
178 # Creates an API client for the Arvados instance identified by
181 # If instance_name contains a slash, it is presumed to be a path
182 # (either local or absolute) to a file with Arvados configuration
185 # Otherwise, it is presumed to be the name of a file in
186 # $HOME/.config/arvados/instance_name.conf
188 def api_for_instance(instance_name):
189 if '/' in instance_name:
190 config_file = instance_name
192 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
195 cfg = arvados.config.load(config_file)
196 except (IOError, OSError) as e:
197 abort(("Could not open config file {}: {}\n" +
198 "You must make sure that your configuration tokens\n" +
199 "for Arvados instance {} are in {} and that this\n" +
200 "file is readable.").format(
201 config_file, e, instance_name, config_file))
203 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
205 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
206 ['1', 't', 'true', 'y', 'yes']))
207 client = arvados.api('v1',
208 host=cfg['ARVADOS_API_HOST'],
209 token=cfg['ARVADOS_API_TOKEN'],
210 insecure=api_is_insecure,
211 model=OrderedJsonModel())
213 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
216 # Check if git is available
217 def check_git_availability():
219 arvados.util.run_command(['git', '--help'])
221 abort('git command is not available. Please ensure git is installed.')
223 # copy_pipeline_instance(pi_uuid, src, dst, args)
225 # Copies a pipeline instance identified by pi_uuid from src to dst.
227 # If the args.recursive option is set:
228 # 1. Copies all input collections
229 # * For each component in the pipeline, include all collections
230 # listed as job dependencies for that component)
231 # 2. Copy docker images
232 # 3. Copy git repositories
233 # 4. Copy the pipeline template
235 # The only changes made to the copied pipeline instance are:
236 # 1. The original pipeline instance UUID is preserved in
237 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
238 # 2. The pipeline_template_uuid is changed to the new template uuid.
239 # 3. The owner_uuid of the instance is changed to the user who
242 def copy_pipeline_instance(pi_uuid, src, dst, args):
243 # Fetch the pipeline instance record.
244 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
247 check_git_availability()
249 if not args.dst_git_repo:
250 abort('--dst-git-repo is required when copying a pipeline recursively.')
251 # Copy the pipeline template and save the copied template.
252 if pi.get('pipeline_template_uuid', None):
253 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
256 # Copy input collections, docker images and git repos.
257 pi = copy_collections(pi, src, dst, args)
258 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
259 copy_docker_images(pi, src, dst, args)
261 # Update the fields of the pipeline instance with the copied
263 if pi.get('pipeline_template_uuid', None):
264 pi['pipeline_template_uuid'] = pt['uuid']
268 logger.info("Copying only pipeline instance %s.", pi_uuid)
269 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
271 # Update the pipeline instance properties, and create the new
273 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
274 pi['description'] = "Pipeline copied from {}\n\n{}".format(
276 pi['description'] if pi.get('description', None) else '')
278 pi['owner_uuid'] = args.project_uuid
282 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
285 def filter_iter(arg):
286 """Iterate a filter string-or-list.
288 Pass in a filter field that can either be a string or list.
289 This will iterate elements as if the field had been written as a list.
291 if isinstance(arg, basestring):
296 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
297 """Update a single repository filter in-place for the destination.
299 If the filter checks that the repository is src_repository, it is
300 updated to check that the repository is dst_repository. If it does
301 anything else, this function raises ValueError.
303 if src_repository is None:
304 raise ValueError("component does not specify a source repository")
305 elif dst_repository is None:
306 raise ValueError("no destination repository specified to update repository filter")
307 elif repo_filter[1:] == ['=', src_repository]:
308 repo_filter[2] = dst_repository
309 elif repo_filter[1:] == ['in', [src_repository]]:
310 repo_filter[2] = [dst_repository]
312 raise ValueError("repository filter is not a simple source match")
314 def migrate_script_version_filter(version_filter):
315 """Update a single script_version filter in-place for the destination.
317 Currently this function checks that all the filter operands are Git
318 commit hashes. If they're not, it raises ValueError to indicate that
319 the filter is not portable. It could be extended to make other
320 transformations in the future.
322 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
323 raise ValueError("script_version filter is not limited to commit hashes")
325 def attr_filtered(filter_, *attr_names):
326 """Return True if filter_ applies to any of attr_names, else False."""
327 return any((name == 'any') or (name in attr_names)
328 for name in filter_iter(filter_[0]))
330 @contextlib.contextmanager
331 def exception_handler(handler, *exc_types):
332 """If any exc_types are raised in the block, call handler on the exception."""
335 except exc_types as error:
338 def migrate_components_filters(template_components, dst_git_repo):
339 """Update template component filters in-place for the destination.
341 template_components is a dictionary of components in a pipeline template.
342 This method walks over each component's filters, and updates them to have
343 identical semantics on the destination cluster. It returns a list of
344 error strings that describe what filters could not be updated safely.
346 dst_git_repo is the name of the destination Git repository, which can
347 be None if that is not known.
350 for cname, cspec in template_components.items():
351 def add_error(errmsg):
352 errors.append("{}: {}".format(cname, errmsg))
353 if not isinstance(cspec, dict):
354 add_error("value is not a component definition")
356 src_repository = cspec.get('repository')
357 filters = cspec.get('filters', [])
358 if not isinstance(filters, list):
359 add_error("filters are not a list")
361 for cfilter in filters:
362 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
363 add_error("malformed filter {!r}".format(cfilter))
365 if attr_filtered(cfilter, 'repository'):
366 with exception_handler(add_error, ValueError):
367 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
368 if attr_filtered(cfilter, 'script_version'):
369 with exception_handler(add_error, ValueError):
370 migrate_script_version_filter(cfilter)
373 # copy_pipeline_template(pt_uuid, src, dst, args)
375 # Copies a pipeline template identified by pt_uuid from src to dst.
377 # If args.recursive is True, also copy any collections, docker
378 # images and git repositories that this template references.
380 # The owner_uuid of the new template is changed to that of the user
381 # who copied the template.
383 # Returns the copied pipeline template object.
385 def copy_pipeline_template(pt_uuid, src, dst, args):
386 # fetch the pipeline template from the source instance
387 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
389 if not args.force_filters:
390 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
392 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
393 "\n".join(filter_errors))
396 check_git_availability()
398 if not args.dst_git_repo:
399 abort('--dst-git-repo is required when copying a pipeline recursively.')
400 # Copy input collections, docker images and git repos.
401 pt = copy_collections(pt, src, dst, args)
402 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
403 copy_docker_images(pt, src, dst, args)
405 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
407 pt['description'] if pt.get('description', None) else '')
408 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
411 pt['owner_uuid'] = args.project_uuid
413 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
415 # copy_workflow(wf_uuid, src, dst, args)
417 # Copies a workflow identified by wf_uuid from src to dst.
419 # If args.recursive is True, also copy any collections
420 # referenced in the workflow definition yaml.
422 # The owner_uuid of the new workflow is set to any given
423 # project_uuid or the user who copied the template.
425 # Returns the copied workflow object.
427 def copy_workflow(wf_uuid, src, dst, args):
428 # fetch the workflow from the source instance
429 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
431 # copy collections and docker images
433 wf_def = yaml.safe_load(wf["definition"])
434 if wf_def is not None:
437 graph = wf_def.get('$graph', None)
438 if graph is not None:
439 workflow_collections(graph, locations, docker_images)
441 workflow_collections(wf_def, locations, docker_images)
444 copy_collections(locations, src, dst, args)
446 for image in docker_images:
447 copy_docker_image(image, docker_images[image], src, dst, args)
449 # copy the workflow itself
451 wf['owner_uuid'] = args.project_uuid
452 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
454 def workflow_collections(obj, locations, docker_images):
455 if isinstance(obj, dict):
456 loc = obj.get('location', None)
458 if loc.startswith("keep:"):
459 locations.append(loc[5:])
461 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
462 if docker_image is not None:
463 ds = docker_image.split(":", 1)
464 tag = ds[1] if len(ds)==2 else 'latest'
465 docker_images[ds[0]] = tag
468 workflow_collections(obj[x], locations, docker_images)
469 elif isinstance(obj, list):
471 workflow_collections(x, locations, docker_images)
473 # copy_collections(obj, src, dst, args)
475 # Recursively copies all collections referenced by 'obj' from src
476 # to dst. obj may be a dict or a list, in which case we run
477 # copy_collections on every value it contains. If it is a string,
478 # search it for any substring that matches a collection hash or uuid
479 # (this will find hidden references to collections like
480 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
482 # Returns a copy of obj with any old collection uuids replaced by
485 def copy_collections(obj, src, dst, args):
487 def copy_collection_fn(collection_match):
488 """Helper function for regex substitution: copies a single collection,
489 identified by the collection_match MatchObject, to the
490 destination. Returns the destination collection uuid (or the
491 portable data hash if that's what src_id is).
494 src_id = collection_match.group(0)
495 if src_id not in collections_copied:
496 dst_col = copy_collection(src_id, src, dst, args)
497 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
498 collections_copied[src_id] = src_id
500 collections_copied[src_id] = dst_col['uuid']
501 return collections_copied[src_id]
503 if isinstance(obj, basestring):
504 # Copy any collections identified in this string to dst, replacing
505 # them with the dst uuids as necessary.
506 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
507 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
509 elif isinstance(obj, dict):
510 return type(obj)((v, copy_collections(obj[v], src, dst, args))
512 elif isinstance(obj, list):
513 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
516 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
517 """Copy a job's script to the destination repository, and update its record.
519 Given a jobspec dictionary, this function finds the referenced script from
520 src and copies it to dst and dst_repo. It also updates jobspec in place to
521 refer to names on the destination.
523 repo = jobspec.get('repository')
526 # script_version is the "script_version" parameter from the source
527 # component or job. If no script_version was supplied in the
528 # component or job, it is a mistake in the pipeline, but for the
529 # purposes of copying the repository, default to "master".
530 script_version = jobspec.get('script_version') or 'master'
531 script_key = (repo, script_version)
532 if script_key not in scripts_copied:
533 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
534 scripts_copied.add(script_key)
535 jobspec['repository'] = dst_repo
536 repo_dir = local_repo_dir[repo]
537 for version_key in ['script_version', 'supplied_script_version']:
538 if version_key in jobspec:
539 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
541 # copy_git_repos(p, src, dst, dst_repo, args)
543 # Copies all git repositories referenced by pipeline instance or
544 # template 'p' from src to dst.
546 # For each component c in the pipeline:
547 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
548 # * Rename script versions:
549 # * c['script_version']
550 # * c['job']['script_version']
551 # * c['job']['supplied_script_version']
552 # to the commit hashes they resolve to, since any symbolic
553 # names (tags, branches) are not preserved in the destination repo.
555 # The pipeline object is updated in place with the new repository
556 # names. The return value is undefined.
558 def copy_git_repos(p, src, dst, dst_repo, args):
559 for component in p['components'].values():
560 migrate_jobspec(component, src, dst, dst_repo, args)
561 if 'job' in component:
562 migrate_jobspec(component['job'], src, dst, dst_repo, args)
564 def total_collection_size(manifest_text):
565 """Return the total number of bytes in this collection (excluding
566 duplicate blocks)."""
570 for line in manifest_text.splitlines():
572 for word in words[1:]:
574 loc = arvados.KeepLocator(word)
576 continue # this word isn't a locator, skip it
577 if loc.md5sum not in locators_seen:
578 locators_seen[loc.md5sum] = True
579 total_bytes += loc.size
583 def create_collection_from(c, src, dst, args):
584 """Create a new collection record on dst, and copy Docker metadata if
587 collection_uuid = c['uuid']
591 c['name'] = "copied from " + collection_uuid
593 if 'properties' in c:
596 c['owner_uuid'] = args.project_uuid
598 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
600 # Create docker_image_repo+tag and docker_image_hash links
601 # at the destination.
602 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
603 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
605 for src_link in docker_links:
606 body = {key: src_link[key]
607 for key in ['link_class', 'name', 'properties']}
608 body['head_uuid'] = dst_collection['uuid']
609 body['owner_uuid'] = args.project_uuid
611 lk = dst.links().create(body=body).execute(num_retries=args.retries)
612 logger.debug('created dst link {}'.format(lk))
614 return dst_collection
616 # copy_collection(obj_uuid, src, dst, args)
618 # Copies the collection identified by obj_uuid from src to dst.
619 # Returns the collection object created at dst.
621 # If args.progress is True, produce a human-friendly progress
624 # If a collection with the desired portable_data_hash already
625 # exists at dst, and args.force is False, copy_collection returns
626 # the existing collection without copying any blocks. Otherwise
627 # (if no collection exists or if args.force is True)
628 # copy_collection copies all of the collection data blocks from src
631 # For this application, it is critical to preserve the
632 # collection's manifest hash, which is not guaranteed with the
633 # arvados.CollectionReader and arvados.CollectionWriter classes.
634 # Copying each block in the collection manually, followed by
635 # the manifest block, ensures that the collection's manifest
636 # hash will not change.
638 def copy_collection(obj_uuid, src, dst, args):
639 if arvados.util.keep_locator_pattern.match(obj_uuid):
640 # If the obj_uuid is a portable data hash, it might not be uniquely
641 # identified with a particular collection. As a result, it is
642 # ambigious as to what name to use for the copy. Apply some heuristics
643 # to pick which collection to get the name from.
644 srccol = src.collections().list(
645 filters=[['portable_data_hash', '=', obj_uuid]],
646 order="created_at asc"
647 ).execute(num_retries=args.retries)
649 items = srccol.get("items")
652 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
658 # There's only one collection with the PDH, so use that.
661 # See if there is a collection that's in the same project
662 # as the root item (usually a pipeline) being copied.
664 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
668 # Didn't find any collections located in the same project, so
669 # pick the oldest collection that has a name assigned to it.
675 # None of the collections have names (?!), so just pick the
679 # list() doesn't return manifest text (and we don't want it to,
680 # because we don't need the same maninfest text sent to us 50
681 # times) so go and retrieve the collection object directly
682 # which will include the manifest text.
683 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
685 # Assume this is an actual collection uuid, so fetch it directly.
686 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
688 # If a collection with this hash already exists at the
689 # destination, and 'force' is not true, just return that
692 if 'portable_data_hash' in c:
693 colhash = c['portable_data_hash']
696 dstcol = dst.collections().list(
697 filters=[['portable_data_hash', '=', colhash]]
698 ).execute(num_retries=args.retries)
699 if dstcol['items_available'] > 0:
700 for d in dstcol['items']:
701 if ((args.project_uuid == d['owner_uuid']) and
702 (c.get('name') == d['name']) and
703 (c['portable_data_hash'] == d['portable_data_hash'])):
705 c['manifest_text'] = dst.collections().get(
706 uuid=dstcol['items'][0]['uuid']
707 ).execute(num_retries=args.retries)['manifest_text']
708 return create_collection_from(c, src, dst, args)
710 # Fetch the collection's manifest.
711 manifest = c['manifest_text']
712 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
714 # Copy each block from src_keep to dst_keep.
715 # Use the newly signed locators returned from dst_keep to build
716 # a new manifest as we go.
717 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
718 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
722 bytes_expected = total_collection_size(manifest)
724 progress_writer = ProgressWriter(human_progress)
726 progress_writer = None
728 for line in manifest.splitlines():
730 dst_manifest += words[0]
731 for word in words[1:]:
733 loc = arvados.KeepLocator(word)
735 # If 'word' can't be parsed as a locator,
736 # presume it's a filename.
737 dst_manifest += ' ' + word
739 blockhash = loc.md5sum
740 # copy this block if we haven't seen it before
741 # (otherwise, just reuse the existing dst_locator)
742 if blockhash not in dst_locators:
743 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
745 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
746 data = src_keep.get(word)
747 dst_locator = dst_keep.put(data)
748 dst_locators[blockhash] = dst_locator
749 bytes_written += loc.size
750 dst_manifest += ' ' + dst_locators[blockhash]
754 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
755 progress_writer.finish()
757 # Copy the manifest and save the collection.
758 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
760 c['manifest_text'] = dst_manifest
761 return create_collection_from(c, src, dst, args)
763 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
764 r = api.repositories().list(
765 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
766 if r['items_available'] != 1:
767 raise Exception('cannot identify repo {}; {} repos found'
768 .format(repo_name, r['items_available']))
770 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
771 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
772 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
774 priority = https_url + other_url + http_url
779 if url.startswith("http"):
780 u = urllib.parse.urlsplit(url)
781 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
782 git_config = ["-c", "credential.%s/.username=none" % baseurl,
783 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
788 logger.debug("trying %s", url)
789 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
790 env={"HOME": os.environ["HOME"],
791 "ARVADOS_API_TOKEN": api.api_token,
792 "GIT_ASKPASS": "/bin/false"})
793 except arvados.errors.CommandFailedError:
800 raise Exception('Cannot access git repository, tried {}'
803 if git_url.startswith("http:"):
804 if allow_insecure_http:
805 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
807 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
809 return (git_url, git_config)
812 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
814 # Copies commits from git repository 'src_git_repo' on Arvados
815 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
816 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
819 # All commits will be copied to a destination branch named for the
820 # source repository URL.
822 # The destination repository must already exist.
824 # The user running this command must be authenticated
825 # to both repositories.
827 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
828 # Identify the fetch and push URLs for the git repositories.
830 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
831 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
833 logger.debug('src_git_url: {}'.format(src_git_url))
834 logger.debug('dst_git_url: {}'.format(dst_git_url))
836 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
838 # Copy git commits from src repo to dst repo.
839 if src_git_repo not in local_repo_dir:
840 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
841 arvados.util.run_command(
842 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
843 local_repo_dir[src_git_repo]],
844 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
845 env={"HOME": os.environ["HOME"],
846 "ARVADOS_API_TOKEN": src.api_token,
847 "GIT_ASKPASS": "/bin/false"})
848 arvados.util.run_command(
849 ["git", "remote", "add", "dst", dst_git_url],
850 cwd=local_repo_dir[src_git_repo])
851 arvados.util.run_command(
852 ["git", "branch", dst_branch, script_version],
853 cwd=local_repo_dir[src_git_repo])
854 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
855 cwd=local_repo_dir[src_git_repo],
856 env={"HOME": os.environ["HOME"],
857 "ARVADOS_API_TOKEN": dst.api_token,
858 "GIT_ASKPASS": "/bin/false"})
860 def copy_docker_images(pipeline, src, dst, args):
861 """Copy any docker images named in the pipeline components'
862 runtime_constraints field from src to dst."""
864 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
865 for c_name, c_info in pipeline['components'].items():
866 if ('runtime_constraints' in c_info and
867 'docker_image' in c_info['runtime_constraints']):
869 c_info['runtime_constraints']['docker_image'],
870 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
874 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
875 """Copy the docker image identified by docker_image and
876 docker_image_tag from src to dst. Create appropriate
877 docker_image_repo+tag and docker_image_hash links at dst.
881 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
883 # Find the link identifying this docker image.
884 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
885 src, args.retries, docker_image, docker_image_tag)
886 if docker_image_list:
887 image_uuid, image_info = docker_image_list[0]
888 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
890 # Copy the collection it refers to.
891 dst_image_col = copy_collection(image_uuid, src, dst, args)
892 elif arvados.util.keep_locator_pattern.match(docker_image):
893 dst_image_col = copy_collection(docker_image, src, dst, args)
895 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
897 # git_rev_parse(rev, repo)
899 # Returns the 40-character commit hash corresponding to 'rev' in
900 # git repository 'repo' (which must be the path of a local git
903 def git_rev_parse(rev, repo):
904 gitout, giterr = arvados.util.run_command(
905 ['git', 'rev-parse', rev], cwd=repo)
906 return gitout.strip()
908 # uuid_type(api, object_uuid)
910 # Returns the name of the class that object_uuid belongs to, based on
911 # the second field of the uuid. This function consults the api's
912 # schema to identify the object class.
914 # It returns a string such as 'Collection', 'PipelineInstance', etc.
916 # Special case: if handed a Keep locator hash, return 'Collection'.
918 def uuid_type(api, object_uuid):
919 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
921 p = object_uuid.split('-')
924 for k in api._schema.schemas:
925 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
926 if type_prefix == obj_class:
930 def abort(msg, code=1):
931 logger.info("arv-copy: %s", msg)
935 # Code for reporting on the progress of a collection upload.
936 # Stolen from arvados.commands.put.ArvPutCollectionWriter
937 # TODO(twp): figure out how to refactor into a shared library
938 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
941 def machine_progress(obj_uuid, bytes_written, bytes_expected):
942 return "{} {}: {} {} written {} total\n".format(
947 -1 if (bytes_expected is None) else bytes_expected)
949 def human_progress(obj_uuid, bytes_written, bytes_expected):
951 return "\r{}: {}M / {}M {:.1%} ".format(
953 bytes_written >> 20, bytes_expected >> 20,
954 float(bytes_written) / bytes_expected)
956 return "\r{}: {} ".format(obj_uuid, bytes_written)
958 class ProgressWriter(object):
959 _progress_func = None
962 def __init__(self, progress_func):
963 self._progress_func = progress_func
965 def report(self, obj_uuid, bytes_written, bytes_expected):
966 if self._progress_func is not None:
968 self._progress_func(obj_uuid, bytes_written, bytes_expected))
971 self.outfile.write("\n")
973 if __name__ == '__main__':