1 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
3 # Copies an object from Arvados instance src to instance dst.
5 # By default, arv-copy recursively copies any dependent objects
6 # necessary to make the object functional in the new instance
7 # (e.g. for a pipeline instance, arv-copy copies the pipeline
8 # template, input collection, docker images, git repositories). If
9 # --no-recursive is given, arv-copy copies only the single record
10 # identified by object-uuid.
12 # The user must have files $HOME/.config/arvados/{src}.conf and
13 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
14 # instances src and dst. If either of these files is not found,
15 # arv-copy will issue an error.
17 from __future__ import division
18 from future import standard_library
19 from future.utils import listvalues
20 standard_library.install_aliases()
21 from past.builtins import basestring
22 from builtins import object
38 import arvados.commands._util as arv_cmd
39 import arvados.commands.keepdocker
40 import ruamel.yaml as yaml
42 from arvados.api import OrderedJsonModel
43 from arvados._version import __version__
45 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
47 logger = logging.getLogger('arvados.arv-copy')
49 # local_repo_dir records which git repositories from the Arvados source
50 # instance have been checked out locally during this run, and to which
52 # e.g. if repository 'twp' from src_arv has been cloned into
53 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
57 # List of collections that have been copied in this session, and their
58 # destination collection UUIDs.
59 collections_copied = {}
61 # Set of (repository, script_version) two-tuples of commits copied in git.
62 scripts_copied = set()
64 # The owner_uuid of the object being copied
68 copy_opts = argparse.ArgumentParser(add_help=False)
70 copy_opts.add_argument(
71 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
72 help='Print version and exit.')
73 copy_opts.add_argument(
74 '-v', '--verbose', dest='verbose', action='store_true',
75 help='Verbose output.')
76 copy_opts.add_argument(
77 '--progress', dest='progress', action='store_true',
78 help='Report progress on copying collections. (default)')
79 copy_opts.add_argument(
80 '--no-progress', dest='progress', action='store_false',
81 help='Do not report progress on copying collections.')
82 copy_opts.add_argument(
83 '-f', '--force', dest='force', action='store_true',
84 help='Perform copy even if the object appears to exist at the remote destination.')
85 copy_opts.add_argument(
86 '--force-filters', action='store_true', default=False,
87 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
88 copy_opts.add_argument(
89 '--src', dest='source_arvados', required=True,
90 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
91 copy_opts.add_argument(
92 '--dst', dest='destination_arvados', required=True,
93 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
94 copy_opts.add_argument(
95 '--recursive', dest='recursive', action='store_true',
96 help='Recursively copy any dependencies for this object. (default)')
97 copy_opts.add_argument(
98 '--no-recursive', dest='recursive', action='store_false',
99 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
100 copy_opts.add_argument(
101 '--dst-git-repo', dest='dst_git_repo',
102 help='The name of the destination git repository. Required when copying a pipeline recursively.')
103 copy_opts.add_argument(
104 '--project-uuid', dest='project_uuid',
105 help='The UUID of the project at the destination to which the pipeline should be copied.')
106 copy_opts.add_argument(
107 '--allow-git-http-src', action="store_true",
108 help='Allow cloning git repositories over insecure http')
109 copy_opts.add_argument(
110 '--allow-git-http-dst', action="store_true",
111 help='Allow pushing git repositories over insecure http')
113 copy_opts.add_argument(
115 help='The UUID of the object to be copied.')
116 copy_opts.set_defaults(progress=True)
117 copy_opts.set_defaults(recursive=True)
119 parser = argparse.ArgumentParser(
120 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
121 parents=[copy_opts, arv_cmd.retry_opt])
122 args = parser.parse_args()
125 logger.setLevel(logging.DEBUG)
127 logger.setLevel(logging.INFO)
129 # Create API clients for the source and destination instances
130 src_arv = api_for_instance(args.source_arvados)
131 dst_arv = api_for_instance(args.destination_arvados)
133 if not args.project_uuid:
134 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
136 # Identify the kind of object we have been given, and begin copying.
137 t = uuid_type(src_arv, args.object_uuid)
138 if t == 'Collection':
139 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
140 result = copy_collection(args.object_uuid,
143 elif t == 'PipelineInstance':
144 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
145 result = copy_pipeline_instance(args.object_uuid,
148 elif t == 'PipelineTemplate':
149 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
150 result = copy_pipeline_template(args.object_uuid,
151 src_arv, dst_arv, args)
152 elif t == 'Workflow':
153 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
154 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
156 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
158 # Clean up any outstanding temp git repositories.
159 for d in listvalues(local_repo_dir):
160 shutil.rmtree(d, ignore_errors=True)
162 # If no exception was thrown and the response does not have an
163 # error_token field, presume success
164 if 'error_token' in result or 'uuid' not in result:
165 logger.error("API server returned an error result: {}".format(result))
169 logger.info("Success: created copy with uuid {}".format(result['uuid']))
172 def set_src_owner_uuid(resource, uuid, args):
173 global src_owner_uuid
174 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
175 src_owner_uuid = c.get("owner_uuid")
177 # api_for_instance(instance_name)
179 # Creates an API client for the Arvados instance identified by
182 # If instance_name contains a slash, it is presumed to be a path
183 # (either local or absolute) to a file with Arvados configuration
186 # Otherwise, it is presumed to be the name of a file in
187 # $HOME/.config/arvados/instance_name.conf
189 def api_for_instance(instance_name):
190 if '/' in instance_name:
191 config_file = instance_name
193 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
196 cfg = arvados.config.load(config_file)
197 except (IOError, OSError) as e:
198 abort(("Could not open config file {}: {}\n" +
199 "You must make sure that your configuration tokens\n" +
200 "for Arvados instance {} are in {} and that this\n" +
201 "file is readable.").format(
202 config_file, e, instance_name, config_file))
204 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
206 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
207 ['1', 't', 'true', 'y', 'yes']))
208 client = arvados.api('v1',
209 host=cfg['ARVADOS_API_HOST'],
210 token=cfg['ARVADOS_API_TOKEN'],
211 insecure=api_is_insecure,
212 model=OrderedJsonModel())
214 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
217 # Check if git is available
218 def check_git_availability():
220 arvados.util.run_command(['git', '--help'])
222 abort('git command is not available. Please ensure git is installed.')
224 # copy_pipeline_instance(pi_uuid, src, dst, args)
226 # Copies a pipeline instance identified by pi_uuid from src to dst.
228 # If the args.recursive option is set:
229 # 1. Copies all input collections
230 # * For each component in the pipeline, include all collections
231 # listed as job dependencies for that component)
232 # 2. Copy docker images
233 # 3. Copy git repositories
234 # 4. Copy the pipeline template
236 # The only changes made to the copied pipeline instance are:
237 # 1. The original pipeline instance UUID is preserved in
238 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
239 # 2. The pipeline_template_uuid is changed to the new template uuid.
240 # 3. The owner_uuid of the instance is changed to the user who
243 def copy_pipeline_instance(pi_uuid, src, dst, args):
244 # Fetch the pipeline instance record.
245 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
248 check_git_availability()
250 if not args.dst_git_repo:
251 abort('--dst-git-repo is required when copying a pipeline recursively.')
252 # Copy the pipeline template and save the copied template.
253 if pi.get('pipeline_template_uuid', None):
254 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
257 # Copy input collections, docker images and git repos.
258 pi = copy_collections(pi, src, dst, args)
259 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
260 copy_docker_images(pi, src, dst, args)
262 # Update the fields of the pipeline instance with the copied
264 if pi.get('pipeline_template_uuid', None):
265 pi['pipeline_template_uuid'] = pt['uuid']
269 logger.info("Copying only pipeline instance %s.", pi_uuid)
270 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
272 # Update the pipeline instance properties, and create the new
274 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
275 pi['description'] = "Pipeline copied from {}\n\n{}".format(
277 pi['description'] if pi.get('description', None) else '')
279 pi['owner_uuid'] = args.project_uuid
283 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
286 def filter_iter(arg):
287 """Iterate a filter string-or-list.
289 Pass in a filter field that can either be a string or list.
290 This will iterate elements as if the field had been written as a list.
292 if isinstance(arg, basestring):
297 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
298 """Update a single repository filter in-place for the destination.
300 If the filter checks that the repository is src_repository, it is
301 updated to check that the repository is dst_repository. If it does
302 anything else, this function raises ValueError.
304 if src_repository is None:
305 raise ValueError("component does not specify a source repository")
306 elif dst_repository is None:
307 raise ValueError("no destination repository specified to update repository filter")
308 elif repo_filter[1:] == ['=', src_repository]:
309 repo_filter[2] = dst_repository
310 elif repo_filter[1:] == ['in', [src_repository]]:
311 repo_filter[2] = [dst_repository]
313 raise ValueError("repository filter is not a simple source match")
315 def migrate_script_version_filter(version_filter):
316 """Update a single script_version filter in-place for the destination.
318 Currently this function checks that all the filter operands are Git
319 commit hashes. If they're not, it raises ValueError to indicate that
320 the filter is not portable. It could be extended to make other
321 transformations in the future.
323 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
324 raise ValueError("script_version filter is not limited to commit hashes")
326 def attr_filtered(filter_, *attr_names):
327 """Return True if filter_ applies to any of attr_names, else False."""
328 return any((name == 'any') or (name in attr_names)
329 for name in filter_iter(filter_[0]))
331 @contextlib.contextmanager
332 def exception_handler(handler, *exc_types):
333 """If any exc_types are raised in the block, call handler on the exception."""
336 except exc_types as error:
339 def migrate_components_filters(template_components, dst_git_repo):
340 """Update template component filters in-place for the destination.
342 template_components is a dictionary of components in a pipeline template.
343 This method walks over each component's filters, and updates them to have
344 identical semantics on the destination cluster. It returns a list of
345 error strings that describe what filters could not be updated safely.
347 dst_git_repo is the name of the destination Git repository, which can
348 be None if that is not known.
351 for cname, cspec in template_components.items():
352 def add_error(errmsg):
353 errors.append("{}: {}".format(cname, errmsg))
354 if not isinstance(cspec, dict):
355 add_error("value is not a component definition")
357 src_repository = cspec.get('repository')
358 filters = cspec.get('filters', [])
359 if not isinstance(filters, list):
360 add_error("filters are not a list")
362 for cfilter in filters:
363 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
364 add_error("malformed filter {!r}".format(cfilter))
366 if attr_filtered(cfilter, 'repository'):
367 with exception_handler(add_error, ValueError):
368 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
369 if attr_filtered(cfilter, 'script_version'):
370 with exception_handler(add_error, ValueError):
371 migrate_script_version_filter(cfilter)
374 # copy_pipeline_template(pt_uuid, src, dst, args)
376 # Copies a pipeline template identified by pt_uuid from src to dst.
378 # If args.recursive is True, also copy any collections, docker
379 # images and git repositories that this template references.
381 # The owner_uuid of the new template is changed to that of the user
382 # who copied the template.
384 # Returns the copied pipeline template object.
386 def copy_pipeline_template(pt_uuid, src, dst, args):
387 # fetch the pipeline template from the source instance
388 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
390 if not args.force_filters:
391 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
393 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
394 "\n".join(filter_errors))
397 check_git_availability()
399 if not args.dst_git_repo:
400 abort('--dst-git-repo is required when copying a pipeline recursively.')
401 # Copy input collections, docker images and git repos.
402 pt = copy_collections(pt, src, dst, args)
403 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
404 copy_docker_images(pt, src, dst, args)
406 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
408 pt['description'] if pt.get('description', None) else '')
409 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
412 pt['owner_uuid'] = args.project_uuid
414 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
416 # copy_workflow(wf_uuid, src, dst, args)
418 # Copies a workflow identified by wf_uuid from src to dst.
420 # If args.recursive is True, also copy any collections
421 # referenced in the workflow definition yaml.
423 # The owner_uuid of the new workflow is set to any given
424 # project_uuid or the user who copied the template.
426 # Returns the copied workflow object.
428 def copy_workflow(wf_uuid, src, dst, args):
429 # fetch the workflow from the source instance
430 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
432 # copy collections and docker images
434 wf_def = yaml.safe_load(wf["definition"])
435 if wf_def is not None:
438 graph = wf_def.get('$graph', None)
439 if graph is not None:
440 workflow_collections(graph, locations, docker_images)
442 workflow_collections(wf_def, locations, docker_images)
445 copy_collections(locations, src, dst, args)
447 for image in docker_images:
448 copy_docker_image(image, docker_images[image], src, dst, args)
450 # copy the workflow itself
452 wf['owner_uuid'] = args.project_uuid
453 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
455 def workflow_collections(obj, locations, docker_images):
456 if isinstance(obj, dict):
457 loc = obj.get('location', None)
459 if loc.startswith("keep:"):
460 locations.append(loc[5:])
462 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
463 if docker_image is not None:
464 ds = docker_image.split(":", 1)
465 tag = ds[1] if len(ds)==2 else 'latest'
466 docker_images[ds[0]] = tag
469 workflow_collections(obj[x], locations, docker_images)
470 elif isinstance(obj, list):
472 workflow_collections(x, locations, docker_images)
474 # copy_collections(obj, src, dst, args)
476 # Recursively copies all collections referenced by 'obj' from src
477 # to dst. obj may be a dict or a list, in which case we run
478 # copy_collections on every value it contains. If it is a string,
479 # search it for any substring that matches a collection hash or uuid
480 # (this will find hidden references to collections like
481 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
483 # Returns a copy of obj with any old collection uuids replaced by
486 def copy_collections(obj, src, dst, args):
488 def copy_collection_fn(collection_match):
489 """Helper function for regex substitution: copies a single collection,
490 identified by the collection_match MatchObject, to the
491 destination. Returns the destination collection uuid (or the
492 portable data hash if that's what src_id is).
495 src_id = collection_match.group(0)
496 if src_id not in collections_copied:
497 dst_col = copy_collection(src_id, src, dst, args)
498 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
499 collections_copied[src_id] = src_id
501 collections_copied[src_id] = dst_col['uuid']
502 return collections_copied[src_id]
504 if isinstance(obj, basestring):
505 # Copy any collections identified in this string to dst, replacing
506 # them with the dst uuids as necessary.
507 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
508 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
510 elif isinstance(obj, dict):
511 return type(obj)((v, copy_collections(obj[v], src, dst, args))
513 elif isinstance(obj, list):
514 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
517 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
518 """Copy a job's script to the destination repository, and update its record.
520 Given a jobspec dictionary, this function finds the referenced script from
521 src and copies it to dst and dst_repo. It also updates jobspec in place to
522 refer to names on the destination.
524 repo = jobspec.get('repository')
527 # script_version is the "script_version" parameter from the source
528 # component or job. If no script_version was supplied in the
529 # component or job, it is a mistake in the pipeline, but for the
530 # purposes of copying the repository, default to "master".
531 script_version = jobspec.get('script_version') or 'master'
532 script_key = (repo, script_version)
533 if script_key not in scripts_copied:
534 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
535 scripts_copied.add(script_key)
536 jobspec['repository'] = dst_repo
537 repo_dir = local_repo_dir[repo]
538 for version_key in ['script_version', 'supplied_script_version']:
539 if version_key in jobspec:
540 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
542 # copy_git_repos(p, src, dst, dst_repo, args)
544 # Copies all git repositories referenced by pipeline instance or
545 # template 'p' from src to dst.
547 # For each component c in the pipeline:
548 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
549 # * Rename script versions:
550 # * c['script_version']
551 # * c['job']['script_version']
552 # * c['job']['supplied_script_version']
553 # to the commit hashes they resolve to, since any symbolic
554 # names (tags, branches) are not preserved in the destination repo.
556 # The pipeline object is updated in place with the new repository
557 # names. The return value is undefined.
559 def copy_git_repos(p, src, dst, dst_repo, args):
560 for component in p['components'].values():
561 migrate_jobspec(component, src, dst, dst_repo, args)
562 if 'job' in component:
563 migrate_jobspec(component['job'], src, dst, dst_repo, args)
565 def total_collection_size(manifest_text):
566 """Return the total number of bytes in this collection (excluding
567 duplicate blocks)."""
571 for line in manifest_text.splitlines():
573 for word in words[1:]:
575 loc = arvados.KeepLocator(word)
577 continue # this word isn't a locator, skip it
578 if loc.md5sum not in locators_seen:
579 locators_seen[loc.md5sum] = True
580 total_bytes += loc.size
584 def create_collection_from(c, src, dst, args):
585 """Create a new collection record on dst, and copy Docker metadata if
588 collection_uuid = c['uuid']
592 c['name'] = "copied from " + collection_uuid
594 if 'properties' in c:
597 c['owner_uuid'] = args.project_uuid
599 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
601 # Create docker_image_repo+tag and docker_image_hash links
602 # at the destination.
603 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
604 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
606 for src_link in docker_links:
607 body = {key: src_link[key]
608 for key in ['link_class', 'name', 'properties']}
609 body['head_uuid'] = dst_collection['uuid']
610 body['owner_uuid'] = args.project_uuid
612 lk = dst.links().create(body=body).execute(num_retries=args.retries)
613 logger.debug('created dst link {}'.format(lk))
615 return dst_collection
617 # copy_collection(obj_uuid, src, dst, args)
619 # Copies the collection identified by obj_uuid from src to dst.
620 # Returns the collection object created at dst.
622 # If args.progress is True, produce a human-friendly progress
625 # If a collection with the desired portable_data_hash already
626 # exists at dst, and args.force is False, copy_collection returns
627 # the existing collection without copying any blocks. Otherwise
628 # (if no collection exists or if args.force is True)
629 # copy_collection copies all of the collection data blocks from src
632 # For this application, it is critical to preserve the
633 # collection's manifest hash, which is not guaranteed with the
634 # arvados.CollectionReader and arvados.CollectionWriter classes.
635 # Copying each block in the collection manually, followed by
636 # the manifest block, ensures that the collection's manifest
637 # hash will not change.
639 def copy_collection(obj_uuid, src, dst, args):
640 if arvados.util.keep_locator_pattern.match(obj_uuid):
641 # If the obj_uuid is a portable data hash, it might not be uniquely
642 # identified with a particular collection. As a result, it is
643 # ambigious as to what name to use for the copy. Apply some heuristics
644 # to pick which collection to get the name from.
645 srccol = src.collections().list(
646 filters=[['portable_data_hash', '=', obj_uuid]],
647 order="created_at asc"
648 ).execute(num_retries=args.retries)
650 items = srccol.get("items")
653 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
659 # There's only one collection with the PDH, so use that.
662 # See if there is a collection that's in the same project
663 # as the root item (usually a pipeline) being copied.
665 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
669 # Didn't find any collections located in the same project, so
670 # pick the oldest collection that has a name assigned to it.
676 # None of the collections have names (?!), so just pick the
680 # list() doesn't return manifest text (and we don't want it to,
681 # because we don't need the same maninfest text sent to us 50
682 # times) so go and retrieve the collection object directly
683 # which will include the manifest text.
684 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
686 # Assume this is an actual collection uuid, so fetch it directly.
687 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
689 # If a collection with this hash already exists at the
690 # destination, and 'force' is not true, just return that
693 if 'portable_data_hash' in c:
694 colhash = c['portable_data_hash']
697 dstcol = dst.collections().list(
698 filters=[['portable_data_hash', '=', colhash]]
699 ).execute(num_retries=args.retries)
700 if dstcol['items_available'] > 0:
701 for d in dstcol['items']:
702 if ((args.project_uuid == d['owner_uuid']) and
703 (c.get('name') == d['name']) and
704 (c['portable_data_hash'] == d['portable_data_hash'])):
706 c['manifest_text'] = dst.collections().get(
707 uuid=dstcol['items'][0]['uuid']
708 ).execute(num_retries=args.retries)['manifest_text']
709 return create_collection_from(c, src, dst, args)
711 # Fetch the collection's manifest.
712 manifest = c['manifest_text']
713 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
715 # Copy each block from src_keep to dst_keep.
716 # Use the newly signed locators returned from dst_keep to build
717 # a new manifest as we go.
718 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
719 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
723 bytes_expected = total_collection_size(manifest)
725 progress_writer = ProgressWriter(human_progress)
727 progress_writer = None
729 for line in manifest.splitlines():
731 dst_manifest += words[0]
732 for word in words[1:]:
734 loc = arvados.KeepLocator(word)
736 # If 'word' can't be parsed as a locator,
737 # presume it's a filename.
738 dst_manifest += ' ' + word
740 blockhash = loc.md5sum
741 # copy this block if we haven't seen it before
742 # (otherwise, just reuse the existing dst_locator)
743 if blockhash not in dst_locators:
744 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
746 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
747 data = src_keep.get(word)
748 dst_locator = dst_keep.put(data)
749 dst_locators[blockhash] = dst_locator
750 bytes_written += loc.size
751 dst_manifest += ' ' + dst_locators[blockhash]
755 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
756 progress_writer.finish()
758 # Copy the manifest and save the collection.
759 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
761 c['manifest_text'] = dst_manifest
762 return create_collection_from(c, src, dst, args)
764 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
765 r = api.repositories().list(
766 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
767 if r['items_available'] != 1:
768 raise Exception('cannot identify repo {}; {} repos found'
769 .format(repo_name, r['items_available']))
771 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
772 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
773 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
775 priority = https_url + other_url + http_url
780 if url.startswith("http"):
781 u = urllib.parse.urlsplit(url)
782 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
783 git_config = ["-c", "credential.%s/.username=none" % baseurl,
784 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
789 logger.debug("trying %s", url)
790 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
791 env={"HOME": os.environ["HOME"],
792 "ARVADOS_API_TOKEN": api.api_token,
793 "GIT_ASKPASS": "/bin/false"})
794 except arvados.errors.CommandFailedError:
801 raise Exception('Cannot access git repository, tried {}'
804 if git_url.startswith("http:"):
805 if allow_insecure_http:
806 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
808 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
810 return (git_url, git_config)
813 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
815 # Copies commits from git repository 'src_git_repo' on Arvados
816 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
817 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
820 # All commits will be copied to a destination branch named for the
821 # source repository URL.
823 # The destination repository must already exist.
825 # The user running this command must be authenticated
826 # to both repositories.
828 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
829 # Identify the fetch and push URLs for the git repositories.
831 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
832 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
834 logger.debug('src_git_url: {}'.format(src_git_url))
835 logger.debug('dst_git_url: {}'.format(dst_git_url))
837 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
839 # Copy git commits from src repo to dst repo.
840 if src_git_repo not in local_repo_dir:
841 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
842 arvados.util.run_command(
843 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
844 local_repo_dir[src_git_repo]],
845 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
846 env={"HOME": os.environ["HOME"],
847 "ARVADOS_API_TOKEN": src.api_token,
848 "GIT_ASKPASS": "/bin/false"})
849 arvados.util.run_command(
850 ["git", "remote", "add", "dst", dst_git_url],
851 cwd=local_repo_dir[src_git_repo])
852 arvados.util.run_command(
853 ["git", "branch", dst_branch, script_version],
854 cwd=local_repo_dir[src_git_repo])
855 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
856 cwd=local_repo_dir[src_git_repo],
857 env={"HOME": os.environ["HOME"],
858 "ARVADOS_API_TOKEN": dst.api_token,
859 "GIT_ASKPASS": "/bin/false"})
861 def copy_docker_images(pipeline, src, dst, args):
862 """Copy any docker images named in the pipeline components'
863 runtime_constraints field from src to dst."""
865 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
866 for c_name, c_info in pipeline['components'].items():
867 if ('runtime_constraints' in c_info and
868 'docker_image' in c_info['runtime_constraints']):
870 c_info['runtime_constraints']['docker_image'],
871 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
875 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
876 """Copy the docker image identified by docker_image and
877 docker_image_tag from src to dst. Create appropriate
878 docker_image_repo+tag and docker_image_hash links at dst.
882 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
884 # Find the link identifying this docker image.
885 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
886 src, args.retries, docker_image, docker_image_tag)
887 if docker_image_list:
888 image_uuid, image_info = docker_image_list[0]
889 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
891 # Copy the collection it refers to.
892 dst_image_col = copy_collection(image_uuid, src, dst, args)
893 elif arvados.util.keep_locator_pattern.match(docker_image):
894 dst_image_col = copy_collection(docker_image, src, dst, args)
896 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
898 # git_rev_parse(rev, repo)
900 # Returns the 40-character commit hash corresponding to 'rev' in
901 # git repository 'repo' (which must be the path of a local git
904 def git_rev_parse(rev, repo):
905 gitout, giterr = arvados.util.run_command(
906 ['git', 'rev-parse', rev], cwd=repo)
907 return gitout.strip()
909 # uuid_type(api, object_uuid)
911 # Returns the name of the class that object_uuid belongs to, based on
912 # the second field of the uuid. This function consults the api's
913 # schema to identify the object class.
915 # It returns a string such as 'Collection', 'PipelineInstance', etc.
917 # Special case: if handed a Keep locator hash, return 'Collection'.
919 def uuid_type(api, object_uuid):
920 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
922 p = object_uuid.split('-')
925 for k in api._schema.schemas:
926 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
927 if type_prefix == obj_class:
931 def abort(msg, code=1):
932 logger.info("arv-copy: %s", msg)
936 # Code for reporting on the progress of a collection upload.
937 # Stolen from arvados.commands.put.ArvPutCollectionWriter
938 # TODO(twp): figure out how to refactor into a shared library
939 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
942 def machine_progress(obj_uuid, bytes_written, bytes_expected):
943 return "{} {}: {} {} written {} total\n".format(
948 -1 if (bytes_expected is None) else bytes_expected)
950 def human_progress(obj_uuid, bytes_written, bytes_expected):
952 return "\r{}: {}M / {}M {:.1%} ".format(
954 bytes_written >> 20, bytes_expected >> 20,
955 float(bytes_written) / bytes_expected)
957 return "\r{}: {} ".format(obj_uuid, bytes_written)
959 class ProgressWriter(object):
960 _progress_func = None
963 def __init__(self, progress_func):
964 self._progress_func = progress_func
966 def report(self, obj_uuid, bytes_written, bytes_expected):
967 if self._progress_func is not None:
969 self._progress_func(obj_uuid, bytes_written, bytes_expected))
972 self.outfile.write("\n")
974 if __name__ == '__main__':