3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
19 from __future__ import division
20 from future import standard_library
21 standard_library.install_aliases()
22 from past.builtins import basestring
23 from builtins import object
24 from past.utils import old_div
40 import arvados.commands._util as arv_cmd
41 import arvados.commands.keepdocker
42 import ruamel.yaml as yaml
44 from arvados.api import OrderedJsonModel
45 from arvados._version import __version__
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
49 logger = logging.getLogger('arvados.arv-copy')
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
66 # The owner_uuid of the object being copied
70 copy_opts = argparse.ArgumentParser(add_help=False)
72 copy_opts.add_argument(
73 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74 help='Print version and exit.')
75 copy_opts.add_argument(
76 '-v', '--verbose', dest='verbose', action='store_true',
77 help='Verbose output.')
78 copy_opts.add_argument(
79 '--progress', dest='progress', action='store_true',
80 help='Report progress on copying collections. (default)')
81 copy_opts.add_argument(
82 '--no-progress', dest='progress', action='store_false',
83 help='Do not report progress on copying collections.')
84 copy_opts.add_argument(
85 '-f', '--force', dest='force', action='store_true',
86 help='Perform copy even if the object appears to exist at the remote destination.')
87 copy_opts.add_argument(
88 '--force-filters', action='store_true', default=False,
89 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
90 copy_opts.add_argument(
91 '--src', dest='source_arvados', required=True,
92 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93 copy_opts.add_argument(
94 '--dst', dest='destination_arvados', required=True,
95 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
96 copy_opts.add_argument(
97 '--recursive', dest='recursive', action='store_true',
98 help='Recursively copy any dependencies for this object. (default)')
99 copy_opts.add_argument(
100 '--no-recursive', dest='recursive', action='store_false',
101 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
102 copy_opts.add_argument(
103 '--dst-git-repo', dest='dst_git_repo',
104 help='The name of the destination git repository. Required when copying a pipeline recursively.')
105 copy_opts.add_argument(
106 '--project-uuid', dest='project_uuid',
107 help='The UUID of the project at the destination to which the pipeline should be copied.')
108 copy_opts.add_argument(
109 '--allow-git-http-src', action="store_true",
110 help='Allow cloning git repositories over insecure http')
111 copy_opts.add_argument(
112 '--allow-git-http-dst', action="store_true",
113 help='Allow pushing git repositories over insecure http')
115 copy_opts.add_argument(
117 help='The UUID of the object to be copied.')
118 copy_opts.set_defaults(progress=True)
119 copy_opts.set_defaults(recursive=True)
121 parser = argparse.ArgumentParser(
122 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
123 parents=[copy_opts, arv_cmd.retry_opt])
124 args = parser.parse_args()
127 logger.setLevel(logging.DEBUG)
129 logger.setLevel(logging.INFO)
131 # Create API clients for the source and destination instances
132 src_arv = api_for_instance(args.source_arvados)
133 dst_arv = api_for_instance(args.destination_arvados)
135 if not args.project_uuid:
136 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
138 # Identify the kind of object we have been given, and begin copying.
139 t = uuid_type(src_arv, args.object_uuid)
140 if t == 'Collection':
141 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142 result = copy_collection(args.object_uuid,
145 elif t == 'PipelineInstance':
146 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
147 result = copy_pipeline_instance(args.object_uuid,
150 elif t == 'PipelineTemplate':
151 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
152 result = copy_pipeline_template(args.object_uuid,
153 src_arv, dst_arv, args)
154 elif t == 'Workflow':
155 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
156 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
158 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
160 # Clean up any outstanding temp git repositories.
161 for d in list(local_repo_dir.values()):
162 shutil.rmtree(d, ignore_errors=True)
164 # If no exception was thrown and the response does not have an
165 # error_token field, presume success
166 if 'error_token' in result or 'uuid' not in result:
167 logger.error("API server returned an error result: {}".format(result))
171 logger.info("Success: created copy with uuid {}".format(result['uuid']))
174 def set_src_owner_uuid(resource, uuid, args):
175 global src_owner_uuid
176 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
177 src_owner_uuid = c.get("owner_uuid")
179 # api_for_instance(instance_name)
181 # Creates an API client for the Arvados instance identified by
184 # If instance_name contains a slash, it is presumed to be a path
185 # (either local or absolute) to a file with Arvados configuration
188 # Otherwise, it is presumed to be the name of a file in
189 # $HOME/.config/arvados/instance_name.conf
191 def api_for_instance(instance_name):
192 if '/' in instance_name:
193 config_file = instance_name
195 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
198 cfg = arvados.config.load(config_file)
199 except (IOError, OSError) as e:
200 abort(("Could not open config file {}: {}\n" +
201 "You must make sure that your configuration tokens\n" +
202 "for Arvados instance {} are in {} and that this\n" +
203 "file is readable.").format(
204 config_file, e, instance_name, config_file))
206 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
208 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
209 ['1', 't', 'true', 'y', 'yes']))
210 client = arvados.api('v1',
211 host=cfg['ARVADOS_API_HOST'],
212 token=cfg['ARVADOS_API_TOKEN'],
213 insecure=api_is_insecure,
214 model=OrderedJsonModel())
216 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
219 # Check if git is available
220 def check_git_availability():
222 arvados.util.run_command(['git', '--help'])
224 abort('git command is not available. Please ensure git is installed.')
226 # copy_pipeline_instance(pi_uuid, src, dst, args)
228 # Copies a pipeline instance identified by pi_uuid from src to dst.
230 # If the args.recursive option is set:
231 # 1. Copies all input collections
232 # * For each component in the pipeline, include all collections
233 # listed as job dependencies for that component)
234 # 2. Copy docker images
235 # 3. Copy git repositories
236 # 4. Copy the pipeline template
238 # The only changes made to the copied pipeline instance are:
239 # 1. The original pipeline instance UUID is preserved in
240 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
241 # 2. The pipeline_template_uuid is changed to the new template uuid.
242 # 3. The owner_uuid of the instance is changed to the user who
245 def copy_pipeline_instance(pi_uuid, src, dst, args):
246 # Fetch the pipeline instance record.
247 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
250 check_git_availability()
252 if not args.dst_git_repo:
253 abort('--dst-git-repo is required when copying a pipeline recursively.')
254 # Copy the pipeline template and save the copied template.
255 if pi.get('pipeline_template_uuid', None):
256 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
259 # Copy input collections, docker images and git repos.
260 pi = copy_collections(pi, src, dst, args)
261 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
262 copy_docker_images(pi, src, dst, args)
264 # Update the fields of the pipeline instance with the copied
266 if pi.get('pipeline_template_uuid', None):
267 pi['pipeline_template_uuid'] = pt['uuid']
271 logger.info("Copying only pipeline instance %s.", pi_uuid)
272 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
274 # Update the pipeline instance properties, and create the new
276 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
277 pi['description'] = "Pipeline copied from {}\n\n{}".format(
279 pi['description'] if pi.get('description', None) else '')
281 pi['owner_uuid'] = args.project_uuid
285 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
288 def filter_iter(arg):
289 """Iterate a filter string-or-list.
291 Pass in a filter field that can either be a string or list.
292 This will iterate elements as if the field had been written as a list.
294 if isinstance(arg, basestring):
299 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
300 """Update a single repository filter in-place for the destination.
302 If the filter checks that the repository is src_repository, it is
303 updated to check that the repository is dst_repository. If it does
304 anything else, this function raises ValueError.
306 if src_repository is None:
307 raise ValueError("component does not specify a source repository")
308 elif dst_repository is None:
309 raise ValueError("no destination repository specified to update repository filter")
310 elif repo_filter[1:] == ['=', src_repository]:
311 repo_filter[2] = dst_repository
312 elif repo_filter[1:] == ['in', [src_repository]]:
313 repo_filter[2] = [dst_repository]
315 raise ValueError("repository filter is not a simple source match")
317 def migrate_script_version_filter(version_filter):
318 """Update a single script_version filter in-place for the destination.
320 Currently this function checks that all the filter operands are Git
321 commit hashes. If they're not, it raises ValueError to indicate that
322 the filter is not portable. It could be extended to make other
323 transformations in the future.
325 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
326 raise ValueError("script_version filter is not limited to commit hashes")
328 def attr_filtered(filter_, *attr_names):
329 """Return True if filter_ applies to any of attr_names, else False."""
330 return any((name == 'any') or (name in attr_names)
331 for name in filter_iter(filter_[0]))
333 @contextlib.contextmanager
334 def exception_handler(handler, *exc_types):
335 """If any exc_types are raised in the block, call handler on the exception."""
338 except exc_types as error:
341 def migrate_components_filters(template_components, dst_git_repo):
342 """Update template component filters in-place for the destination.
344 template_components is a dictionary of components in a pipeline template.
345 This method walks over each component's filters, and updates them to have
346 identical semantics on the destination cluster. It returns a list of
347 error strings that describe what filters could not be updated safely.
349 dst_git_repo is the name of the destination Git repository, which can
350 be None if that is not known.
353 for cname, cspec in template_components.items():
354 def add_error(errmsg):
355 errors.append("{}: {}".format(cname, errmsg))
356 if not isinstance(cspec, dict):
357 add_error("value is not a component definition")
359 src_repository = cspec.get('repository')
360 filters = cspec.get('filters', [])
361 if not isinstance(filters, list):
362 add_error("filters are not a list")
364 for cfilter in filters:
365 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
366 add_error("malformed filter {!r}".format(cfilter))
368 if attr_filtered(cfilter, 'repository'):
369 with exception_handler(add_error, ValueError):
370 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
371 if attr_filtered(cfilter, 'script_version'):
372 with exception_handler(add_error, ValueError):
373 migrate_script_version_filter(cfilter)
376 # copy_pipeline_template(pt_uuid, src, dst, args)
378 # Copies a pipeline template identified by pt_uuid from src to dst.
380 # If args.recursive is True, also copy any collections, docker
381 # images and git repositories that this template references.
383 # The owner_uuid of the new template is changed to that of the user
384 # who copied the template.
386 # Returns the copied pipeline template object.
388 def copy_pipeline_template(pt_uuid, src, dst, args):
389 # fetch the pipeline template from the source instance
390 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
392 if not args.force_filters:
393 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
395 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
396 "\n".join(filter_errors))
399 check_git_availability()
401 if not args.dst_git_repo:
402 abort('--dst-git-repo is required when copying a pipeline recursively.')
403 # Copy input collections, docker images and git repos.
404 pt = copy_collections(pt, src, dst, args)
405 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
406 copy_docker_images(pt, src, dst, args)
408 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
410 pt['description'] if pt.get('description', None) else '')
411 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
414 pt['owner_uuid'] = args.project_uuid
416 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
418 # copy_workflow(wf_uuid, src, dst, args)
420 # Copies a workflow identified by wf_uuid from src to dst.
422 # If args.recursive is True, also copy any collections
423 # referenced in the workflow definition yaml.
425 # The owner_uuid of the new workflow is set to any given
426 # project_uuid or the user who copied the template.
428 # Returns the copied workflow object.
430 def copy_workflow(wf_uuid, src, dst, args):
431 # fetch the workflow from the source instance
432 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
434 # copy collections and docker images
436 wf_def = yaml.safe_load(wf["definition"])
437 if wf_def is not None:
440 graph = wf_def.get('$graph', None)
441 if graph is not None:
442 workflow_collections(graph, locations, docker_images)
444 workflow_collections(wf_def, locations, docker_images)
447 copy_collections(locations, src, dst, args)
449 for image in docker_images:
450 copy_docker_image(image, docker_images[image], src, dst, args)
452 # copy the workflow itself
454 wf['owner_uuid'] = args.project_uuid
455 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
457 def workflow_collections(obj, locations, docker_images):
458 if isinstance(obj, dict):
459 loc = obj.get('location', None)
461 if loc.startswith("keep:"):
462 locations.append(loc[5:])
464 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
465 if docker_image is not None:
466 ds = docker_image.split(":", 1)
467 tag = ds[1] if len(ds)==2 else 'latest'
468 docker_images[ds[0]] = tag
471 workflow_collections(obj[x], locations, docker_images)
472 elif isinstance(obj, list):
474 workflow_collections(x, locations, docker_images)
476 # copy_collections(obj, src, dst, args)
478 # Recursively copies all collections referenced by 'obj' from src
479 # to dst. obj may be a dict or a list, in which case we run
480 # copy_collections on every value it contains. If it is a string,
481 # search it for any substring that matches a collection hash or uuid
482 # (this will find hidden references to collections like
483 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
485 # Returns a copy of obj with any old collection uuids replaced by
488 def copy_collections(obj, src, dst, args):
490 def copy_collection_fn(collection_match):
491 """Helper function for regex substitution: copies a single collection,
492 identified by the collection_match MatchObject, to the
493 destination. Returns the destination collection uuid (or the
494 portable data hash if that's what src_id is).
497 src_id = collection_match.group(0)
498 if src_id not in collections_copied:
499 dst_col = copy_collection(src_id, src, dst, args)
500 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
501 collections_copied[src_id] = src_id
503 collections_copied[src_id] = dst_col['uuid']
504 return collections_copied[src_id]
506 if isinstance(obj, basestring):
507 # Copy any collections identified in this string to dst, replacing
508 # them with the dst uuids as necessary.
509 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
510 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
512 elif isinstance(obj, dict):
513 return type(obj)((v, copy_collections(obj[v], src, dst, args))
515 elif isinstance(obj, list):
516 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
519 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
520 """Copy a job's script to the destination repository, and update its record.
522 Given a jobspec dictionary, this function finds the referenced script from
523 src and copies it to dst and dst_repo. It also updates jobspec in place to
524 refer to names on the destination.
526 repo = jobspec.get('repository')
529 # script_version is the "script_version" parameter from the source
530 # component or job. If no script_version was supplied in the
531 # component or job, it is a mistake in the pipeline, but for the
532 # purposes of copying the repository, default to "master".
533 script_version = jobspec.get('script_version') or 'master'
534 script_key = (repo, script_version)
535 if script_key not in scripts_copied:
536 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
537 scripts_copied.add(script_key)
538 jobspec['repository'] = dst_repo
539 repo_dir = local_repo_dir[repo]
540 for version_key in ['script_version', 'supplied_script_version']:
541 if version_key in jobspec:
542 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
544 # copy_git_repos(p, src, dst, dst_repo, args)
546 # Copies all git repositories referenced by pipeline instance or
547 # template 'p' from src to dst.
549 # For each component c in the pipeline:
550 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
551 # * Rename script versions:
552 # * c['script_version']
553 # * c['job']['script_version']
554 # * c['job']['supplied_script_version']
555 # to the commit hashes they resolve to, since any symbolic
556 # names (tags, branches) are not preserved in the destination repo.
558 # The pipeline object is updated in place with the new repository
559 # names. The return value is undefined.
561 def copy_git_repos(p, src, dst, dst_repo, args):
562 for component in p['components'].values():
563 migrate_jobspec(component, src, dst, dst_repo, args)
564 if 'job' in component:
565 migrate_jobspec(component['job'], src, dst, dst_repo, args)
567 def total_collection_size(manifest_text):
568 """Return the total number of bytes in this collection (excluding
569 duplicate blocks)."""
573 for line in manifest_text.splitlines():
575 for word in words[1:]:
577 loc = arvados.KeepLocator(word)
579 continue # this word isn't a locator, skip it
580 if loc.md5sum not in locators_seen:
581 locators_seen[loc.md5sum] = True
582 total_bytes += loc.size
586 def create_collection_from(c, src, dst, args):
587 """Create a new collection record on dst, and copy Docker metadata if
590 collection_uuid = c['uuid']
594 c['name'] = "copied from " + collection_uuid
596 if 'properties' in c:
599 c['owner_uuid'] = args.project_uuid
601 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
603 # Create docker_image_repo+tag and docker_image_hash links
604 # at the destination.
605 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
606 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
608 for src_link in docker_links:
609 body = {key: src_link[key]
610 for key in ['link_class', 'name', 'properties']}
611 body['head_uuid'] = dst_collection['uuid']
612 body['owner_uuid'] = args.project_uuid
614 lk = dst.links().create(body=body).execute(num_retries=args.retries)
615 logger.debug('created dst link {}'.format(lk))
617 return dst_collection
619 # copy_collection(obj_uuid, src, dst, args)
621 # Copies the collection identified by obj_uuid from src to dst.
622 # Returns the collection object created at dst.
624 # If args.progress is True, produce a human-friendly progress
627 # If a collection with the desired portable_data_hash already
628 # exists at dst, and args.force is False, copy_collection returns
629 # the existing collection without copying any blocks. Otherwise
630 # (if no collection exists or if args.force is True)
631 # copy_collection copies all of the collection data blocks from src
634 # For this application, it is critical to preserve the
635 # collection's manifest hash, which is not guaranteed with the
636 # arvados.CollectionReader and arvados.CollectionWriter classes.
637 # Copying each block in the collection manually, followed by
638 # the manifest block, ensures that the collection's manifest
639 # hash will not change.
641 def copy_collection(obj_uuid, src, dst, args):
642 if arvados.util.keep_locator_pattern.match(obj_uuid):
643 # If the obj_uuid is a portable data hash, it might not be uniquely
644 # identified with a particular collection. As a result, it is
645 # ambigious as to what name to use for the copy. Apply some heuristics
646 # to pick which collection to get the name from.
647 srccol = src.collections().list(
648 filters=[['portable_data_hash', '=', obj_uuid]],
649 order="created_at asc"
650 ).execute(num_retries=args.retries)
652 items = srccol.get("items")
655 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
661 # There's only one collection with the PDH, so use that.
664 # See if there is a collection that's in the same project
665 # as the root item (usually a pipeline) being copied.
667 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
671 # Didn't find any collections located in the same project, so
672 # pick the oldest collection that has a name assigned to it.
678 # None of the collections have names (?!), so just pick the
682 # list() doesn't return manifest text (and we don't want it to,
683 # because we don't need the same maninfest text sent to us 50
684 # times) so go and retrieve the collection object directly
685 # which will include the manifest text.
686 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
688 # Assume this is an actual collection uuid, so fetch it directly.
689 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
691 # If a collection with this hash already exists at the
692 # destination, and 'force' is not true, just return that
695 if 'portable_data_hash' in c:
696 colhash = c['portable_data_hash']
699 dstcol = dst.collections().list(
700 filters=[['portable_data_hash', '=', colhash]]
701 ).execute(num_retries=args.retries)
702 if dstcol['items_available'] > 0:
703 for d in dstcol['items']:
704 if ((args.project_uuid == d['owner_uuid']) and
705 (c.get('name') == d['name']) and
706 (c['portable_data_hash'] == d['portable_data_hash'])):
708 c['manifest_text'] = dst.collections().get(
709 uuid=dstcol['items'][0]['uuid']
710 ).execute(num_retries=args.retries)['manifest_text']
711 return create_collection_from(c, src, dst, args)
713 # Fetch the collection's manifest.
714 manifest = c['manifest_text']
715 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
717 # Copy each block from src_keep to dst_keep.
718 # Use the newly signed locators returned from dst_keep to build
719 # a new manifest as we go.
720 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
721 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
725 bytes_expected = total_collection_size(manifest)
727 progress_writer = ProgressWriter(human_progress)
729 progress_writer = None
731 for line in manifest.splitlines():
733 dst_manifest += words[0]
734 for word in words[1:]:
736 loc = arvados.KeepLocator(word)
738 # If 'word' can't be parsed as a locator,
739 # presume it's a filename.
740 dst_manifest += ' ' + word
742 blockhash = loc.md5sum
743 # copy this block if we haven't seen it before
744 # (otherwise, just reuse the existing dst_locator)
745 if blockhash not in dst_locators:
746 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
748 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
749 data = src_keep.get(word)
750 dst_locator = dst_keep.put(data)
751 dst_locators[blockhash] = dst_locator
752 bytes_written += loc.size
753 dst_manifest += ' ' + dst_locators[blockhash]
757 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
758 progress_writer.finish()
760 # Copy the manifest and save the collection.
761 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
763 c['manifest_text'] = dst_manifest
764 return create_collection_from(c, src, dst, args)
766 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
767 r = api.repositories().list(
768 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
769 if r['items_available'] != 1:
770 raise Exception('cannot identify repo {}; {} repos found'
771 .format(repo_name, r['items_available']))
773 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
774 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
775 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
777 priority = https_url + other_url + http_url
782 if url.startswith("http"):
783 u = urllib.parse.urlsplit(url)
784 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
785 git_config = ["-c", "credential.%s/.username=none" % baseurl,
786 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
791 logger.debug("trying %s", url)
792 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
793 env={"HOME": os.environ["HOME"],
794 "ARVADOS_API_TOKEN": api.api_token,
795 "GIT_ASKPASS": "/bin/false"})
796 except arvados.errors.CommandFailedError:
803 raise Exception('Cannot access git repository, tried {}'
806 if git_url.startswith("http:"):
807 if allow_insecure_http:
808 logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
810 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
812 return (git_url, git_config)
815 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
817 # Copies commits from git repository 'src_git_repo' on Arvados
818 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
819 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
822 # All commits will be copied to a destination branch named for the
823 # source repository URL.
825 # The destination repository must already exist.
827 # The user running this command must be authenticated
828 # to both repositories.
830 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
831 # Identify the fetch and push URLs for the git repositories.
833 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
834 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
836 logger.debug('src_git_url: {}'.format(src_git_url))
837 logger.debug('dst_git_url: {}'.format(dst_git_url))
839 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
841 # Copy git commits from src repo to dst repo.
842 if src_git_repo not in local_repo_dir:
843 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
844 arvados.util.run_command(
845 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
846 local_repo_dir[src_git_repo]],
847 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
848 env={"HOME": os.environ["HOME"],
849 "ARVADOS_API_TOKEN": src.api_token,
850 "GIT_ASKPASS": "/bin/false"})
851 arvados.util.run_command(
852 ["git", "remote", "add", "dst", dst_git_url],
853 cwd=local_repo_dir[src_git_repo])
854 arvados.util.run_command(
855 ["git", "branch", dst_branch, script_version],
856 cwd=local_repo_dir[src_git_repo])
857 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
858 cwd=local_repo_dir[src_git_repo],
859 env={"HOME": os.environ["HOME"],
860 "ARVADOS_API_TOKEN": dst.api_token,
861 "GIT_ASKPASS": "/bin/false"})
863 def copy_docker_images(pipeline, src, dst, args):
864 """Copy any docker images named in the pipeline components'
865 runtime_constraints field from src to dst."""
867 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
868 for c_name, c_info in pipeline['components'].items():
869 if ('runtime_constraints' in c_info and
870 'docker_image' in c_info['runtime_constraints']):
872 c_info['runtime_constraints']['docker_image'],
873 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
877 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
878 """Copy the docker image identified by docker_image and
879 docker_image_tag from src to dst. Create appropriate
880 docker_image_repo+tag and docker_image_hash links at dst.
884 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
886 # Find the link identifying this docker image.
887 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
888 src, args.retries, docker_image, docker_image_tag)
889 if docker_image_list:
890 image_uuid, image_info = docker_image_list[0]
891 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
893 # Copy the collection it refers to.
894 dst_image_col = copy_collection(image_uuid, src, dst, args)
895 elif arvados.util.keep_locator_pattern.match(docker_image):
896 dst_image_col = copy_collection(docker_image, src, dst, args)
898 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
900 # git_rev_parse(rev, repo)
902 # Returns the 40-character commit hash corresponding to 'rev' in
903 # git repository 'repo' (which must be the path of a local git
906 def git_rev_parse(rev, repo):
907 gitout, giterr = arvados.util.run_command(
908 ['git', 'rev-parse', rev], cwd=repo)
909 return gitout.strip()
911 # uuid_type(api, object_uuid)
913 # Returns the name of the class that object_uuid belongs to, based on
914 # the second field of the uuid. This function consults the api's
915 # schema to identify the object class.
917 # It returns a string such as 'Collection', 'PipelineInstance', etc.
919 # Special case: if handed a Keep locator hash, return 'Collection'.
921 def uuid_type(api, object_uuid):
922 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
924 p = object_uuid.split('-')
927 for k in api._schema.schemas:
928 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
929 if type_prefix == obj_class:
933 def abort(msg, code=1):
934 logger.info("arv-copy: %s", msg)
938 # Code for reporting on the progress of a collection upload.
939 # Stolen from arvados.commands.put.ArvPutCollectionWriter
940 # TODO(twp): figure out how to refactor into a shared library
941 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
944 def machine_progress(obj_uuid, bytes_written, bytes_expected):
945 return "{} {}: {} {} written {} total\n".format(
950 -1 if (bytes_expected is None) else bytes_expected)
952 def human_progress(obj_uuid, bytes_written, bytes_expected):
954 return "\r{}: {}M / {}M {:.1%} ".format(
956 bytes_written >> 20, bytes_expected >> 20,
957 old_div(float(bytes_written), bytes_expected))
959 return "\r{}: {} ".format(obj_uuid, bytes_written)
961 class ProgressWriter(object):
962 _progress_func = None
965 def __init__(self, progress_func):
966 self._progress_func = progress_func
968 def report(self, obj_uuid, bytes_written, bytes_expected):
969 if self._progress_func is not None:
971 self._progress_func(obj_uuid, bytes_written, bytes_expected))
974 self.outfile.write("\n")
976 if __name__ == '__main__':