3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
36 import ruamel.yaml as yaml
38 from arvados.api import OrderedJsonModel
39 from arvados._version import __version__
41 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
43 logger = logging.getLogger('arvados.arv-copy')
45 # local_repo_dir records which git repositories from the Arvados source
46 # instance have been checked out locally during this run, and to which
48 # e.g. if repository 'twp' from src_arv has been cloned into
49 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
53 # List of collections that have been copied in this session, and their
54 # destination collection UUIDs.
55 collections_copied = {}
57 # Set of (repository, script_version) two-tuples of commits copied in git.
58 scripts_copied = set()
60 # The owner_uuid of the object being copied
64 copy_opts = argparse.ArgumentParser(add_help=False)
66 copy_opts.add_argument(
67 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
68 help='Print version and exit.')
69 copy_opts.add_argument(
70 '-v', '--verbose', dest='verbose', action='store_true',
71 help='Verbose output.')
72 copy_opts.add_argument(
73 '--progress', dest='progress', action='store_true',
74 help='Report progress on copying collections. (default)')
75 copy_opts.add_argument(
76 '--no-progress', dest='progress', action='store_false',
77 help='Do not report progress on copying collections.')
78 copy_opts.add_argument(
79 '-f', '--force', dest='force', action='store_true',
80 help='Perform copy even if the object appears to exist at the remote destination.')
81 copy_opts.add_argument(
82 '--force-filters', action='store_true', default=False,
83 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
84 copy_opts.add_argument(
85 '--src', dest='source_arvados', required=True,
86 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
87 copy_opts.add_argument(
88 '--dst', dest='destination_arvados', required=True,
89 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
90 copy_opts.add_argument(
91 '--recursive', dest='recursive', action='store_true',
92 help='Recursively copy any dependencies for this object. (default)')
93 copy_opts.add_argument(
94 '--no-recursive', dest='recursive', action='store_false',
95 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
96 copy_opts.add_argument(
97 '--dst-git-repo', dest='dst_git_repo',
98 help='The name of the destination git repository. Required when copying a pipeline recursively.')
99 copy_opts.add_argument(
100 '--project-uuid', dest='project_uuid',
101 help='The UUID of the project at the destination to which the pipeline should be copied.')
102 copy_opts.add_argument(
103 '--allow-git-http-src', action="store_true",
104 help='Allow cloning git repositories over insecure http')
105 copy_opts.add_argument(
106 '--allow-git-http-dst', action="store_true",
107 help='Allow pushing git repositories over insecure http')
109 copy_opts.add_argument(
111 help='The UUID of the object to be copied.')
112 copy_opts.set_defaults(progress=True)
113 copy_opts.set_defaults(recursive=True)
115 parser = argparse.ArgumentParser(
116 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
117 parents=[copy_opts, arv_cmd.retry_opt])
118 args = parser.parse_args()
121 logger.setLevel(logging.DEBUG)
123 logger.setLevel(logging.INFO)
125 # Create API clients for the source and destination instances
126 src_arv = api_for_instance(args.source_arvados)
127 dst_arv = api_for_instance(args.destination_arvados)
129 if not args.project_uuid:
130 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
132 # Identify the kind of object we have been given, and begin copying.
133 t = uuid_type(src_arv, args.object_uuid)
134 if t == 'Collection':
135 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
136 result = copy_collection(args.object_uuid,
139 elif t == 'PipelineInstance':
140 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
141 result = copy_pipeline_instance(args.object_uuid,
144 elif t == 'PipelineTemplate':
145 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
146 result = copy_pipeline_template(args.object_uuid,
147 src_arv, dst_arv, args)
148 elif t == 'Workflow':
149 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
150 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
152 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
154 # Clean up any outstanding temp git repositories.
155 for d in local_repo_dir.values():
156 shutil.rmtree(d, ignore_errors=True)
158 # If no exception was thrown and the response does not have an
159 # error_token field, presume success
160 if 'error_token' in result or 'uuid' not in result:
161 logger.error("API server returned an error result: {}".format(result))
165 logger.info("Success: created copy with uuid {}".format(result['uuid']))
168 def set_src_owner_uuid(resource, uuid, args):
169 global src_owner_uuid
170 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
171 src_owner_uuid = c.get("owner_uuid")
173 # api_for_instance(instance_name)
175 # Creates an API client for the Arvados instance identified by
178 # If instance_name contains a slash, it is presumed to be a path
179 # (either local or absolute) to a file with Arvados configuration
182 # Otherwise, it is presumed to be the name of a file in
183 # $HOME/.config/arvados/instance_name.conf
185 def api_for_instance(instance_name):
186 if '/' in instance_name:
187 config_file = instance_name
189 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
192 cfg = arvados.config.load(config_file)
193 except (IOError, OSError) as e:
194 abort(("Could not open config file {}: {}\n" +
195 "You must make sure that your configuration tokens\n" +
196 "for Arvados instance {} are in {} and that this\n" +
197 "file is readable.").format(
198 config_file, e, instance_name, config_file))
200 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
202 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
203 ['1', 't', 'true', 'y', 'yes']))
204 client = arvados.api('v1',
205 host=cfg['ARVADOS_API_HOST'],
206 token=cfg['ARVADOS_API_TOKEN'],
207 insecure=api_is_insecure,
208 model=OrderedJsonModel())
210 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
213 # Check if git is available
214 def check_git_availability():
216 arvados.util.run_command(['git', '--help'])
218 abort('git command is not available. Please ensure git is installed.')
220 # copy_pipeline_instance(pi_uuid, src, dst, args)
222 # Copies a pipeline instance identified by pi_uuid from src to dst.
224 # If the args.recursive option is set:
225 # 1. Copies all input collections
226 # * For each component in the pipeline, include all collections
227 # listed as job dependencies for that component)
228 # 2. Copy docker images
229 # 3. Copy git repositories
230 # 4. Copy the pipeline template
232 # The only changes made to the copied pipeline instance are:
233 # 1. The original pipeline instance UUID is preserved in
234 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
235 # 2. The pipeline_template_uuid is changed to the new template uuid.
236 # 3. The owner_uuid of the instance is changed to the user who
239 def copy_pipeline_instance(pi_uuid, src, dst, args):
240 # Fetch the pipeline instance record.
241 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
244 check_git_availability()
246 if not args.dst_git_repo:
247 abort('--dst-git-repo is required when copying a pipeline recursively.')
248 # Copy the pipeline template and save the copied template.
249 if pi.get('pipeline_template_uuid', None):
250 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
253 # Copy input collections, docker images and git repos.
254 pi = copy_collections(pi, src, dst, args)
255 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
256 copy_docker_images(pi, src, dst, args)
258 # Update the fields of the pipeline instance with the copied
260 if pi.get('pipeline_template_uuid', None):
261 pi['pipeline_template_uuid'] = pt['uuid']
265 logger.info("Copying only pipeline instance %s.", pi_uuid)
266 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
268 # Update the pipeline instance properties, and create the new
270 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
271 pi['description'] = "Pipeline copied from {}\n\n{}".format(
273 pi['description'] if pi.get('description', None) else '')
275 pi['owner_uuid'] = args.project_uuid
279 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
282 def filter_iter(arg):
283 """Iterate a filter string-or-list.
285 Pass in a filter field that can either be a string or list.
286 This will iterate elements as if the field had been written as a list.
288 if isinstance(arg, basestring):
293 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
294 """Update a single repository filter in-place for the destination.
296 If the filter checks that the repository is src_repository, it is
297 updated to check that the repository is dst_repository. If it does
298 anything else, this function raises ValueError.
300 if src_repository is None:
301 raise ValueError("component does not specify a source repository")
302 elif dst_repository is None:
303 raise ValueError("no destination repository specified to update repository filter")
304 elif repo_filter[1:] == ['=', src_repository]:
305 repo_filter[2] = dst_repository
306 elif repo_filter[1:] == ['in', [src_repository]]:
307 repo_filter[2] = [dst_repository]
309 raise ValueError("repository filter is not a simple source match")
311 def migrate_script_version_filter(version_filter):
312 """Update a single script_version filter in-place for the destination.
314 Currently this function checks that all the filter operands are Git
315 commit hashes. If they're not, it raises ValueError to indicate that
316 the filter is not portable. It could be extended to make other
317 transformations in the future.
319 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
320 raise ValueError("script_version filter is not limited to commit hashes")
322 def attr_filtered(filter_, *attr_names):
323 """Return True if filter_ applies to any of attr_names, else False."""
324 return any((name == 'any') or (name in attr_names)
325 for name in filter_iter(filter_[0]))
327 @contextlib.contextmanager
328 def exception_handler(handler, *exc_types):
329 """If any exc_types are raised in the block, call handler on the exception."""
332 except exc_types as error:
335 def migrate_components_filters(template_components, dst_git_repo):
336 """Update template component filters in-place for the destination.
338 template_components is a dictionary of components in a pipeline template.
339 This method walks over each component's filters, and updates them to have
340 identical semantics on the destination cluster. It returns a list of
341 error strings that describe what filters could not be updated safely.
343 dst_git_repo is the name of the destination Git repository, which can
344 be None if that is not known.
347 for cname, cspec in template_components.iteritems():
348 def add_error(errmsg):
349 errors.append("{}: {}".format(cname, errmsg))
350 if not isinstance(cspec, dict):
351 add_error("value is not a component definition")
353 src_repository = cspec.get('repository')
354 filters = cspec.get('filters', [])
355 if not isinstance(filters, list):
356 add_error("filters are not a list")
358 for cfilter in filters:
359 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
360 add_error("malformed filter {!r}".format(cfilter))
362 if attr_filtered(cfilter, 'repository'):
363 with exception_handler(add_error, ValueError):
364 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
365 if attr_filtered(cfilter, 'script_version'):
366 with exception_handler(add_error, ValueError):
367 migrate_script_version_filter(cfilter)
370 # copy_pipeline_template(pt_uuid, src, dst, args)
372 # Copies a pipeline template identified by pt_uuid from src to dst.
374 # If args.recursive is True, also copy any collections, docker
375 # images and git repositories that this template references.
377 # The owner_uuid of the new template is changed to that of the user
378 # who copied the template.
380 # Returns the copied pipeline template object.
382 def copy_pipeline_template(pt_uuid, src, dst, args):
383 # fetch the pipeline template from the source instance
384 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
386 if not args.force_filters:
387 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
389 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
390 "\n".join(filter_errors))
393 check_git_availability()
395 if not args.dst_git_repo:
396 abort('--dst-git-repo is required when copying a pipeline recursively.')
397 # Copy input collections, docker images and git repos.
398 pt = copy_collections(pt, src, dst, args)
399 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
400 copy_docker_images(pt, src, dst, args)
402 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
404 pt['description'] if pt.get('description', None) else '')
405 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
408 pt['owner_uuid'] = args.project_uuid
410 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
412 # copy_workflow(wf_uuid, src, dst, args)
414 # Copies a workflow identified by wf_uuid from src to dst.
416 # If args.recursive is True, also copy any collections
417 # referenced in the workflow definition yaml.
419 # The owner_uuid of the new workflow is set to any given
420 # project_uuid or the user who copied the template.
422 # Returns the copied workflow object.
424 def copy_workflow(wf_uuid, src, dst, args):
425 # fetch the workflow from the source instance
426 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
428 # copy collections and docker images
430 wf_def = yaml.safe_load(wf["definition"])
431 if wf_def is not None:
434 graph = wf_def.get('$graph', None)
435 if graph is not None:
436 workflow_collections(graph, locations, docker_images)
438 workflow_collections(wf_def, locations, docker_images)
441 copy_collections(locations, src, dst, args)
443 for image in docker_images:
444 copy_docker_image(image, docker_images[image], src, dst, args)
446 # copy the workflow itself
448 wf['owner_uuid'] = args.project_uuid
449 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
451 def workflow_collections(obj, locations, docker_images):
452 if isinstance(obj, dict):
453 loc = obj.get('location', None)
455 if loc.startswith("keep:"):
456 locations.append(loc[5:])
458 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
459 if docker_image is not None:
460 ds = docker_image.split(":", 1)
461 tag = ds[1] if len(ds)==2 else 'latest'
462 docker_images[ds[0]] = tag
465 workflow_collections(obj[x], locations, docker_images)
466 elif isinstance(obj, list):
468 workflow_collections(x, locations, docker_images)
470 # copy_collections(obj, src, dst, args)
472 # Recursively copies all collections referenced by 'obj' from src
473 # to dst. obj may be a dict or a list, in which case we run
474 # copy_collections on every value it contains. If it is a string,
475 # search it for any substring that matches a collection hash or uuid
476 # (this will find hidden references to collections like
477 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
479 # Returns a copy of obj with any old collection uuids replaced by
482 def copy_collections(obj, src, dst, args):
484 def copy_collection_fn(collection_match):
485 """Helper function for regex substitution: copies a single collection,
486 identified by the collection_match MatchObject, to the
487 destination. Returns the destination collection uuid (or the
488 portable data hash if that's what src_id is).
491 src_id = collection_match.group(0)
492 if src_id not in collections_copied:
493 dst_col = copy_collection(src_id, src, dst, args)
494 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
495 collections_copied[src_id] = src_id
497 collections_copied[src_id] = dst_col['uuid']
498 return collections_copied[src_id]
500 if isinstance(obj, basestring):
501 # Copy any collections identified in this string to dst, replacing
502 # them with the dst uuids as necessary.
503 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
504 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
506 elif isinstance(obj, dict):
507 return type(obj)((v, copy_collections(obj[v], src, dst, args))
509 elif isinstance(obj, list):
510 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
513 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
514 """Copy a job's script to the destination repository, and update its record.
516 Given a jobspec dictionary, this function finds the referenced script from
517 src and copies it to dst and dst_repo. It also updates jobspec in place to
518 refer to names on the destination.
520 repo = jobspec.get('repository')
523 # script_version is the "script_version" parameter from the source
524 # component or job. If no script_version was supplied in the
525 # component or job, it is a mistake in the pipeline, but for the
526 # purposes of copying the repository, default to "master".
527 script_version = jobspec.get('script_version') or 'master'
528 script_key = (repo, script_version)
529 if script_key not in scripts_copied:
530 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
531 scripts_copied.add(script_key)
532 jobspec['repository'] = dst_repo
533 repo_dir = local_repo_dir[repo]
534 for version_key in ['script_version', 'supplied_script_version']:
535 if version_key in jobspec:
536 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
538 # copy_git_repos(p, src, dst, dst_repo, args)
540 # Copies all git repositories referenced by pipeline instance or
541 # template 'p' from src to dst.
543 # For each component c in the pipeline:
544 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
545 # * Rename script versions:
546 # * c['script_version']
547 # * c['job']['script_version']
548 # * c['job']['supplied_script_version']
549 # to the commit hashes they resolve to, since any symbolic
550 # names (tags, branches) are not preserved in the destination repo.
552 # The pipeline object is updated in place with the new repository
553 # names. The return value is undefined.
555 def copy_git_repos(p, src, dst, dst_repo, args):
556 for component in p['components'].itervalues():
557 migrate_jobspec(component, src, dst, dst_repo, args)
558 if 'job' in component:
559 migrate_jobspec(component['job'], src, dst, dst_repo, args)
561 def total_collection_size(manifest_text):
562 """Return the total number of bytes in this collection (excluding
563 duplicate blocks)."""
567 for line in manifest_text.splitlines():
569 for word in words[1:]:
571 loc = arvados.KeepLocator(word)
573 continue # this word isn't a locator, skip it
574 if loc.md5sum not in locators_seen:
575 locators_seen[loc.md5sum] = True
576 total_bytes += loc.size
580 def create_collection_from(c, src, dst, args):
581 """Create a new collection record on dst, and copy Docker metadata if
584 collection_uuid = c['uuid']
588 c['name'] = "copied from " + collection_uuid
590 if 'properties' in c:
593 c['owner_uuid'] = args.project_uuid
595 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
597 # Create docker_image_repo+tag and docker_image_hash links
598 # at the destination.
599 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
600 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
602 for src_link in docker_links:
603 body = {key: src_link[key]
604 for key in ['link_class', 'name', 'properties']}
605 body['head_uuid'] = dst_collection['uuid']
606 body['owner_uuid'] = args.project_uuid
608 lk = dst.links().create(body=body).execute(num_retries=args.retries)
609 logger.debug('created dst link {}'.format(lk))
611 return dst_collection
613 # copy_collection(obj_uuid, src, dst, args)
615 # Copies the collection identified by obj_uuid from src to dst.
616 # Returns the collection object created at dst.
618 # If args.progress is True, produce a human-friendly progress
621 # If a collection with the desired portable_data_hash already
622 # exists at dst, and args.force is False, copy_collection returns
623 # the existing collection without copying any blocks. Otherwise
624 # (if no collection exists or if args.force is True)
625 # copy_collection copies all of the collection data blocks from src
628 # For this application, it is critical to preserve the
629 # collection's manifest hash, which is not guaranteed with the
630 # arvados.CollectionReader and arvados.CollectionWriter classes.
631 # Copying each block in the collection manually, followed by
632 # the manifest block, ensures that the collection's manifest
633 # hash will not change.
635 def copy_collection(obj_uuid, src, dst, args):
636 if arvados.util.keep_locator_pattern.match(obj_uuid):
637 # If the obj_uuid is a portable data hash, it might not be uniquely
638 # identified with a particular collection. As a result, it is
639 # ambigious as to what name to use for the copy. Apply some heuristics
640 # to pick which collection to get the name from.
641 srccol = src.collections().list(
642 filters=[['portable_data_hash', '=', obj_uuid]],
643 order="created_at asc"
644 ).execute(num_retries=args.retries)
646 items = srccol.get("items")
649 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
655 # There's only one collection with the PDH, so use that.
658 # See if there is a collection that's in the same project
659 # as the root item (usually a pipeline) being copied.
661 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
665 # Didn't find any collections located in the same project, so
666 # pick the oldest collection that has a name assigned to it.
672 # None of the collections have names (?!), so just pick the
676 # list() doesn't return manifest text (and we don't want it to,
677 # because we don't need the same maninfest text sent to us 50
678 # times) so go and retrieve the collection object directly
679 # which will include the manifest text.
680 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
682 # Assume this is an actual collection uuid, so fetch it directly.
683 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
685 # If a collection with this hash already exists at the
686 # destination, and 'force' is not true, just return that
689 if 'portable_data_hash' in c:
690 colhash = c['portable_data_hash']
693 dstcol = dst.collections().list(
694 filters=[['portable_data_hash', '=', colhash]]
695 ).execute(num_retries=args.retries)
696 if dstcol['items_available'] > 0:
697 for d in dstcol['items']:
698 if ((args.project_uuid == d['owner_uuid']) and
699 (c.get('name') == d['name']) and
700 (c['portable_data_hash'] == d['portable_data_hash'])):
702 c['manifest_text'] = dst.collections().get(
703 uuid=dstcol['items'][0]['uuid']
704 ).execute(num_retries=args.retries)['manifest_text']
705 return create_collection_from(c, src, dst, args)
707 # Fetch the collection's manifest.
708 manifest = c['manifest_text']
709 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
711 # Copy each block from src_keep to dst_keep.
712 # Use the newly signed locators returned from dst_keep to build
713 # a new manifest as we go.
714 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
715 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
719 bytes_expected = total_collection_size(manifest)
721 progress_writer = ProgressWriter(human_progress)
723 progress_writer = None
725 for line in manifest.splitlines():
727 dst_manifest += words[0]
728 for word in words[1:]:
730 loc = arvados.KeepLocator(word)
732 # If 'word' can't be parsed as a locator,
733 # presume it's a filename.
734 dst_manifest += ' ' + word
736 blockhash = loc.md5sum
737 # copy this block if we haven't seen it before
738 # (otherwise, just reuse the existing dst_locator)
739 if blockhash not in dst_locators:
740 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
742 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
743 data = src_keep.get(word)
744 dst_locator = dst_keep.put(data)
745 dst_locators[blockhash] = dst_locator
746 bytes_written += loc.size
747 dst_manifest += ' ' + dst_locators[blockhash]
751 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
752 progress_writer.finish()
754 # Copy the manifest and save the collection.
755 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
757 c['manifest_text'] = dst_manifest
758 return create_collection_from(c, src, dst, args)
760 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
761 r = api.repositories().list(
762 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
763 if r['items_available'] != 1:
764 raise Exception('cannot identify repo {}; {} repos found'
765 .format(repo_name, r['items_available']))
767 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
768 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
769 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
771 priority = https_url + other_url + http_url
776 if url.startswith("http"):
777 u = urlparse.urlsplit(url)
778 baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
779 git_config = ["-c", "credential.%s/.username=none" % baseurl,
780 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
785 logger.debug("trying %s", url)
786 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
787 env={"HOME": os.environ["HOME"],
788 "ARVADOS_API_TOKEN": api.api_token,
789 "GIT_ASKPASS": "/bin/false"})
790 except arvados.errors.CommandFailedError:
797 raise Exception('Cannot access git repository, tried {}'
800 if git_url.startswith("http:"):
801 if allow_insecure_http:
802 logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
804 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
806 return (git_url, git_config)
809 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
811 # Copies commits from git repository 'src_git_repo' on Arvados
812 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
813 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
816 # All commits will be copied to a destination branch named for the
817 # source repository URL.
819 # The destination repository must already exist.
821 # The user running this command must be authenticated
822 # to both repositories.
824 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
825 # Identify the fetch and push URLs for the git repositories.
827 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
828 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
830 logger.debug('src_git_url: {}'.format(src_git_url))
831 logger.debug('dst_git_url: {}'.format(dst_git_url))
833 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
835 # Copy git commits from src repo to dst repo.
836 if src_git_repo not in local_repo_dir:
837 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
838 arvados.util.run_command(
839 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
840 local_repo_dir[src_git_repo]],
841 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
842 env={"HOME": os.environ["HOME"],
843 "ARVADOS_API_TOKEN": src.api_token,
844 "GIT_ASKPASS": "/bin/false"})
845 arvados.util.run_command(
846 ["git", "remote", "add", "dst", dst_git_url],
847 cwd=local_repo_dir[src_git_repo])
848 arvados.util.run_command(
849 ["git", "branch", dst_branch, script_version],
850 cwd=local_repo_dir[src_git_repo])
851 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
852 cwd=local_repo_dir[src_git_repo],
853 env={"HOME": os.environ["HOME"],
854 "ARVADOS_API_TOKEN": dst.api_token,
855 "GIT_ASKPASS": "/bin/false"})
857 def copy_docker_images(pipeline, src, dst, args):
858 """Copy any docker images named in the pipeline components'
859 runtime_constraints field from src to dst."""
861 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
862 for c_name, c_info in pipeline['components'].iteritems():
863 if ('runtime_constraints' in c_info and
864 'docker_image' in c_info['runtime_constraints']):
866 c_info['runtime_constraints']['docker_image'],
867 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
871 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
872 """Copy the docker image identified by docker_image and
873 docker_image_tag from src to dst. Create appropriate
874 docker_image_repo+tag and docker_image_hash links at dst.
878 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
880 # Find the link identifying this docker image.
881 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
882 src, args.retries, docker_image, docker_image_tag)
883 if docker_image_list:
884 image_uuid, image_info = docker_image_list[0]
885 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
887 # Copy the collection it refers to.
888 dst_image_col = copy_collection(image_uuid, src, dst, args)
889 elif arvados.util.keep_locator_pattern.match(docker_image):
890 dst_image_col = copy_collection(docker_image, src, dst, args)
892 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
894 # git_rev_parse(rev, repo)
896 # Returns the 40-character commit hash corresponding to 'rev' in
897 # git repository 'repo' (which must be the path of a local git
900 def git_rev_parse(rev, repo):
901 gitout, giterr = arvados.util.run_command(
902 ['git', 'rev-parse', rev], cwd=repo)
903 return gitout.strip()
905 # uuid_type(api, object_uuid)
907 # Returns the name of the class that object_uuid belongs to, based on
908 # the second field of the uuid. This function consults the api's
909 # schema to identify the object class.
911 # It returns a string such as 'Collection', 'PipelineInstance', etc.
913 # Special case: if handed a Keep locator hash, return 'Collection'.
915 def uuid_type(api, object_uuid):
916 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
918 p = object_uuid.split('-')
921 for k in api._schema.schemas:
922 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
923 if type_prefix == obj_class:
927 def abort(msg, code=1):
928 logger.info("arv-copy: %s", msg)
932 # Code for reporting on the progress of a collection upload.
933 # Stolen from arvados.commands.put.ArvPutCollectionWriter
934 # TODO(twp): figure out how to refactor into a shared library
935 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
938 def machine_progress(obj_uuid, bytes_written, bytes_expected):
939 return "{} {}: {} {} written {} total\n".format(
944 -1 if (bytes_expected is None) else bytes_expected)
946 def human_progress(obj_uuid, bytes_written, bytes_expected):
948 return "\r{}: {}M / {}M {:.1%} ".format(
950 bytes_written >> 20, bytes_expected >> 20,
951 float(bytes_written) / bytes_expected)
953 return "\r{}: {} ".format(obj_uuid, bytes_written)
955 class ProgressWriter(object):
956 _progress_func = None
959 def __init__(self, progress_func):
960 self._progress_func = progress_func
962 def report(self, obj_uuid, bytes_written, bytes_expected):
963 if self._progress_func is not None:
965 self._progress_func(obj_uuid, bytes_written, bytes_expected))
968 self.outfile.write("\n")
970 if __name__ == '__main__':