3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
35 import arvados.commands._util as arv_cmd
36 import arvados.commands.keepdocker
38 from arvados.api import OrderedJsonModel
40 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
42 logger = logging.getLogger('arvados.arv-copy')
44 # local_repo_dir records which git repositories from the Arvados source
45 # instance have been checked out locally during this run, and to which
47 # e.g. if repository 'twp' from src_arv has been cloned into
48 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
52 # List of collections that have been copied in this session, and their
53 # destination collection UUIDs.
54 collections_copied = {}
56 # Set of (repository, script_version) two-tuples of commits copied in git.
57 scripts_copied = set()
59 # The owner_uuid of the object being copied
63 copy_opts = argparse.ArgumentParser(add_help=False)
65 copy_opts.add_argument(
66 '-v', '--verbose', dest='verbose', action='store_true',
67 help='Verbose output.')
68 copy_opts.add_argument(
69 '--progress', dest='progress', action='store_true',
70 help='Report progress on copying collections. (default)')
71 copy_opts.add_argument(
72 '--no-progress', dest='progress', action='store_false',
73 help='Do not report progress on copying collections.')
74 copy_opts.add_argument(
75 '-f', '--force', dest='force', action='store_true',
76 help='Perform copy even if the object appears to exist at the remote destination.')
77 copy_opts.add_argument(
78 '--force-filters', action='store_true', default=False,
79 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
80 copy_opts.add_argument(
81 '--src', dest='source_arvados', required=True,
82 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
83 copy_opts.add_argument(
84 '--dst', dest='destination_arvados', required=True,
85 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
86 copy_opts.add_argument(
87 '--recursive', dest='recursive', action='store_true',
88 help='Recursively copy any dependencies for this object. (default)')
89 copy_opts.add_argument(
90 '--no-recursive', dest='recursive', action='store_false',
91 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
92 copy_opts.add_argument(
93 '--dst-git-repo', dest='dst_git_repo',
94 help='The name of the destination git repository. Required when copying a pipeline recursively.')
95 copy_opts.add_argument(
96 '--project-uuid', dest='project_uuid',
97 help='The UUID of the project at the destination to which the pipeline should be copied.')
98 copy_opts.add_argument(
99 '--allow-git-http-src', action="store_true",
100 help='Allow cloning git repositories over insecure http')
101 copy_opts.add_argument(
102 '--allow-git-http-dst', action="store_true",
103 help='Allow pushing git repositories over insecure http')
105 copy_opts.add_argument(
107 help='The UUID of the object to be copied.')
108 copy_opts.set_defaults(progress=True)
109 copy_opts.set_defaults(recursive=True)
111 parser = argparse.ArgumentParser(
112 description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
113 parents=[copy_opts, arv_cmd.retry_opt])
114 args = parser.parse_args()
117 logger.setLevel(logging.DEBUG)
119 logger.setLevel(logging.INFO)
121 # Create API clients for the source and destination instances
122 src_arv = api_for_instance(args.source_arvados)
123 dst_arv = api_for_instance(args.destination_arvados)
125 if not args.project_uuid:
126 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
128 # Identify the kind of object we have been given, and begin copying.
129 t = uuid_type(src_arv, args.object_uuid)
130 if t == 'Collection':
131 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
132 result = copy_collection(args.object_uuid,
135 elif t == 'PipelineInstance':
136 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
137 result = copy_pipeline_instance(args.object_uuid,
140 elif t == 'PipelineTemplate':
141 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
142 result = copy_pipeline_template(args.object_uuid,
143 src_arv, dst_arv, args)
145 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
147 # Clean up any outstanding temp git repositories.
148 for d in local_repo_dir.values():
149 shutil.rmtree(d, ignore_errors=True)
151 # If no exception was thrown and the response does not have an
152 # error_token field, presume success
153 if 'error_token' in result or 'uuid' not in result:
154 logger.error("API server returned an error result: {}".format(result))
158 logger.info("Success: created copy with uuid {}".format(result['uuid']))
161 def set_src_owner_uuid(resource, uuid, args):
162 global src_owner_uuid
163 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
164 src_owner_uuid = c.get("owner_uuid")
166 # api_for_instance(instance_name)
168 # Creates an API client for the Arvados instance identified by
171 # If instance_name contains a slash, it is presumed to be a path
172 # (either local or absolute) to a file with Arvados configuration
175 # Otherwise, it is presumed to be the name of a file in
176 # $HOME/.config/arvados/instance_name.conf
178 def api_for_instance(instance_name):
179 if '/' in instance_name:
180 config_file = instance_name
182 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
185 cfg = arvados.config.load(config_file)
186 except (IOError, OSError) as e:
187 abort(("Could not open config file {}: {}\n" +
188 "You must make sure that your configuration tokens\n" +
189 "for Arvados instance {} are in {} and that this\n" +
190 "file is readable.").format(
191 config_file, e, instance_name, config_file))
193 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
195 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
196 ['1', 't', 'true', 'y', 'yes']))
197 client = arvados.api('v1',
198 host=cfg['ARVADOS_API_HOST'],
199 token=cfg['ARVADOS_API_TOKEN'],
200 insecure=api_is_insecure,
201 model=OrderedJsonModel())
203 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
206 # Check if git is available
207 def check_git_availability():
209 arvados.util.run_command(['git', '--help'])
211 abort('git command is not available. Please ensure git is installed.')
213 # copy_pipeline_instance(pi_uuid, src, dst, args)
215 # Copies a pipeline instance identified by pi_uuid from src to dst.
217 # If the args.recursive option is set:
218 # 1. Copies all input collections
219 # * For each component in the pipeline, include all collections
220 # listed as job dependencies for that component)
221 # 2. Copy docker images
222 # 3. Copy git repositories
223 # 4. Copy the pipeline template
225 # The only changes made to the copied pipeline instance are:
226 # 1. The original pipeline instance UUID is preserved in
227 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
228 # 2. The pipeline_template_uuid is changed to the new template uuid.
229 # 3. The owner_uuid of the instance is changed to the user who
232 def copy_pipeline_instance(pi_uuid, src, dst, args):
233 # Fetch the pipeline instance record.
234 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
237 check_git_availability()
239 if not args.dst_git_repo:
240 abort('--dst-git-repo is required when copying a pipeline recursively.')
241 # Copy the pipeline template and save the copied template.
242 if pi.get('pipeline_template_uuid', None):
243 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
246 # Copy input collections, docker images and git repos.
247 pi = copy_collections(pi, src, dst, args)
248 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
249 copy_docker_images(pi, src, dst, args)
251 # Update the fields of the pipeline instance with the copied
253 if pi.get('pipeline_template_uuid', None):
254 pi['pipeline_template_uuid'] = pt['uuid']
258 logger.info("Copying only pipeline instance %s.", pi_uuid)
259 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
261 # Update the pipeline instance properties, and create the new
263 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
264 pi['description'] = "Pipeline copied from {}\n\n{}".format(
266 pi['description'] if pi.get('description', None) else '')
268 pi['owner_uuid'] = args.project_uuid
272 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
275 def filter_iter(arg):
276 """Iterate a filter string-or-list.
278 Pass in a filter field that can either be a string or list.
279 This will iterate elements as if the field had been written as a list.
281 if isinstance(arg, basestring):
286 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
287 """Update a single repository filter in-place for the destination.
289 If the filter checks that the repository is src_repository, it is
290 updated to check that the repository is dst_repository. If it does
291 anything else, this function raises ValueError.
293 if src_repository is None:
294 raise ValueError("component does not specify a source repository")
295 elif dst_repository is None:
296 raise ValueError("no destination repository specified to update repository filter")
297 elif repo_filter[1:] == ['=', src_repository]:
298 repo_filter[2] = dst_repository
299 elif repo_filter[1:] == ['in', [src_repository]]:
300 repo_filter[2] = [dst_repository]
302 raise ValueError("repository filter is not a simple source match")
304 def migrate_script_version_filter(version_filter):
305 """Update a single script_version filter in-place for the destination.
307 Currently this function checks that all the filter operands are Git
308 commit hashes. If they're not, it raises ValueError to indicate that
309 the filter is not portable. It could be extended to make other
310 transformations in the future.
312 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
313 raise ValueError("script_version filter is not limited to commit hashes")
315 def attr_filtered(filter_, *attr_names):
316 """Return True if filter_ applies to any of attr_names, else False."""
317 return any((name == 'any') or (name in attr_names)
318 for name in filter_iter(filter_[0]))
320 @contextlib.contextmanager
321 def exception_handler(handler, *exc_types):
322 """If any exc_types are raised in the block, call handler on the exception."""
325 except exc_types as error:
328 def migrate_components_filters(template_components, dst_git_repo):
329 """Update template component filters in-place for the destination.
331 template_components is a dictionary of components in a pipeline template.
332 This method walks over each component's filters, and updates them to have
333 identical semantics on the destination cluster. It returns a list of
334 error strings that describe what filters could not be updated safely.
336 dst_git_repo is the name of the destination Git repository, which can
337 be None if that is not known.
340 for cname, cspec in template_components.iteritems():
341 def add_error(errmsg):
342 errors.append("{}: {}".format(cname, errmsg))
343 if not isinstance(cspec, dict):
344 add_error("value is not a component definition")
346 src_repository = cspec.get('repository')
347 filters = cspec.get('filters', [])
348 if not isinstance(filters, list):
349 add_error("filters are not a list")
351 for cfilter in filters:
352 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
353 add_error("malformed filter {!r}".format(cfilter))
355 if attr_filtered(cfilter, 'repository'):
356 with exception_handler(add_error, ValueError):
357 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
358 if attr_filtered(cfilter, 'script_version'):
359 with exception_handler(add_error, ValueError):
360 migrate_script_version_filter(cfilter)
363 # copy_pipeline_template(pt_uuid, src, dst, args)
365 # Copies a pipeline template identified by pt_uuid from src to dst.
367 # If args.recursive is True, also copy any collections, docker
368 # images and git repositories that this template references.
370 # The owner_uuid of the new template is changed to that of the user
371 # who copied the template.
373 # Returns the copied pipeline template object.
375 def copy_pipeline_template(pt_uuid, src, dst, args):
376 # fetch the pipeline template from the source instance
377 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
379 if not args.force_filters:
380 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
382 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
383 "\n".join(filter_errors))
386 check_git_availability()
388 if not args.dst_git_repo:
389 abort('--dst-git-repo is required when copying a pipeline recursively.')
390 # Copy input collections, docker images and git repos.
391 pt = copy_collections(pt, src, dst, args)
392 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
393 copy_docker_images(pt, src, dst, args)
395 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
397 pt['description'] if pt.get('description', None) else '')
398 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
401 pt['owner_uuid'] = args.project_uuid
403 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
405 # copy_collections(obj, src, dst, args)
407 # Recursively copies all collections referenced by 'obj' from src
408 # to dst. obj may be a dict or a list, in which case we run
409 # copy_collections on every value it contains. If it is a string,
410 # search it for any substring that matches a collection hash or uuid
411 # (this will find hidden references to collections like
412 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
414 # Returns a copy of obj with any old collection uuids replaced by
417 def copy_collections(obj, src, dst, args):
419 def copy_collection_fn(collection_match):
420 """Helper function for regex substitution: copies a single collection,
421 identified by the collection_match MatchObject, to the
422 destination. Returns the destination collection uuid (or the
423 portable data hash if that's what src_id is).
426 src_id = collection_match.group(0)
427 if src_id not in collections_copied:
428 dst_col = copy_collection(src_id, src, dst, args)
429 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
430 collections_copied[src_id] = src_id
432 collections_copied[src_id] = dst_col['uuid']
433 return collections_copied[src_id]
435 if isinstance(obj, basestring):
436 # Copy any collections identified in this string to dst, replacing
437 # them with the dst uuids as necessary.
438 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
439 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
441 elif isinstance(obj, dict):
442 return type(obj)((v, copy_collections(obj[v], src, dst, args))
444 elif isinstance(obj, list):
445 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
448 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
449 """Copy a job's script to the destination repository, and update its record.
451 Given a jobspec dictionary, this function finds the referenced script from
452 src and copies it to dst and dst_repo. It also updates jobspec in place to
453 refer to names on the destination.
455 repo = jobspec.get('repository')
458 # script_version is the "script_version" parameter from the source
459 # component or job. If no script_version was supplied in the
460 # component or job, it is a mistake in the pipeline, but for the
461 # purposes of copying the repository, default to "master".
462 script_version = jobspec.get('script_version') or 'master'
463 script_key = (repo, script_version)
464 if script_key not in scripts_copied:
465 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
466 scripts_copied.add(script_key)
467 jobspec['repository'] = dst_repo
468 repo_dir = local_repo_dir[repo]
469 for version_key in ['script_version', 'supplied_script_version']:
470 if version_key in jobspec:
471 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
473 # copy_git_repos(p, src, dst, dst_repo, args)
475 # Copies all git repositories referenced by pipeline instance or
476 # template 'p' from src to dst.
478 # For each component c in the pipeline:
479 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
480 # * Rename script versions:
481 # * c['script_version']
482 # * c['job']['script_version']
483 # * c['job']['supplied_script_version']
484 # to the commit hashes they resolve to, since any symbolic
485 # names (tags, branches) are not preserved in the destination repo.
487 # The pipeline object is updated in place with the new repository
488 # names. The return value is undefined.
490 def copy_git_repos(p, src, dst, dst_repo, args):
491 for component in p['components'].itervalues():
492 migrate_jobspec(component, src, dst, dst_repo, args)
493 if 'job' in component:
494 migrate_jobspec(component['job'], src, dst, dst_repo, args)
496 def total_collection_size(manifest_text):
497 """Return the total number of bytes in this collection (excluding
498 duplicate blocks)."""
502 for line in manifest_text.splitlines():
504 for word in words[1:]:
506 loc = arvados.KeepLocator(word)
508 continue # this word isn't a locator, skip it
509 if loc.md5sum not in locators_seen:
510 locators_seen[loc.md5sum] = True
511 total_bytes += loc.size
515 def create_collection_from(c, src, dst, args):
516 """Create a new collection record on dst, and copy Docker metadata if
519 collection_uuid = c['uuid']
523 c['name'] = "copied from " + collection_uuid
525 if 'properties' in c:
528 c['owner_uuid'] = args.project_uuid
530 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
532 # Create docker_image_repo+tag and docker_image_hash links
533 # at the destination.
534 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
535 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
537 for src_link in docker_links:
538 body = {key: src_link[key]
539 for key in ['link_class', 'name', 'properties']}
540 body['head_uuid'] = dst_collection['uuid']
541 body['owner_uuid'] = args.project_uuid
543 lk = dst.links().create(body=body).execute(num_retries=args.retries)
544 logger.debug('created dst link {}'.format(lk))
546 return dst_collection
548 # copy_collection(obj_uuid, src, dst, args)
550 # Copies the collection identified by obj_uuid from src to dst.
551 # Returns the collection object created at dst.
553 # If args.progress is True, produce a human-friendly progress
556 # If a collection with the desired portable_data_hash already
557 # exists at dst, and args.force is False, copy_collection returns
558 # the existing collection without copying any blocks. Otherwise
559 # (if no collection exists or if args.force is True)
560 # copy_collection copies all of the collection data blocks from src
563 # For this application, it is critical to preserve the
564 # collection's manifest hash, which is not guaranteed with the
565 # arvados.CollectionReader and arvados.CollectionWriter classes.
566 # Copying each block in the collection manually, followed by
567 # the manifest block, ensures that the collection's manifest
568 # hash will not change.
570 def copy_collection(obj_uuid, src, dst, args):
571 if arvados.util.keep_locator_pattern.match(obj_uuid):
572 # If the obj_uuid is a portable data hash, it might not be uniquely
573 # identified with a particular collection. As a result, it is
574 # ambigious as to what name to use for the copy. Apply some heuristics
575 # to pick which collection to get the name from.
576 srccol = src.collections().list(
577 filters=[['portable_data_hash', '=', obj_uuid]],
578 order="created_at asc"
579 ).execute(num_retries=args.retries)
581 items = srccol.get("items")
584 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
590 # There's only one collection with the PDH, so use that.
593 # See if there is a collection that's in the same project
594 # as the root item (usually a pipeline) being copied.
596 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
600 # Didn't find any collections located in the same project, so
601 # pick the oldest collection that has a name assigned to it.
607 # None of the collections have names (?!), so just pick the
611 # list() doesn't return manifest text (and we don't want it to,
612 # because we don't need the same maninfest text sent to us 50
613 # times) so go and retrieve the collection object directly
614 # which will include the manifest text.
615 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
617 # Assume this is an actual collection uuid, so fetch it directly.
618 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
620 # If a collection with this hash already exists at the
621 # destination, and 'force' is not true, just return that
624 if 'portable_data_hash' in c:
625 colhash = c['portable_data_hash']
628 dstcol = dst.collections().list(
629 filters=[['portable_data_hash', '=', colhash]]
630 ).execute(num_retries=args.retries)
631 if dstcol['items_available'] > 0:
632 for d in dstcol['items']:
633 if ((args.project_uuid == d['owner_uuid']) and
634 (c.get('name') == d['name']) and
635 (c['portable_data_hash'] == d['portable_data_hash'])):
637 c['manifest_text'] = dst.collections().get(
638 uuid=dstcol['items'][0]['uuid']
639 ).execute(num_retries=args.retries)['manifest_text']
640 return create_collection_from(c, src, dst, args)
642 # Fetch the collection's manifest.
643 manifest = c['manifest_text']
644 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
646 # Copy each block from src_keep to dst_keep.
647 # Use the newly signed locators returned from dst_keep to build
648 # a new manifest as we go.
649 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
650 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
654 bytes_expected = total_collection_size(manifest)
656 progress_writer = ProgressWriter(human_progress)
658 progress_writer = None
660 for line in manifest.splitlines():
662 dst_manifest += words[0]
663 for word in words[1:]:
665 loc = arvados.KeepLocator(word)
667 # If 'word' can't be parsed as a locator,
668 # presume it's a filename.
669 dst_manifest += ' ' + word
671 blockhash = loc.md5sum
672 # copy this block if we haven't seen it before
673 # (otherwise, just reuse the existing dst_locator)
674 if blockhash not in dst_locators:
675 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
677 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
678 data = src_keep.get(word)
679 dst_locator = dst_keep.put(data)
680 dst_locators[blockhash] = dst_locator
681 bytes_written += loc.size
682 dst_manifest += ' ' + dst_locators[blockhash]
686 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
687 progress_writer.finish()
689 # Copy the manifest and save the collection.
690 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
692 c['manifest_text'] = dst_manifest
693 return create_collection_from(c, src, dst, args)
695 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
696 r = api.repositories().list(
697 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
698 if r['items_available'] != 1:
699 raise Exception('cannot identify repo {}; {} repos found'
700 .format(repo_name, r['items_available']))
702 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
703 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
704 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
706 priority = https_url + other_url + http_url
711 if url.startswith("http"):
712 u = urlparse.urlsplit(url)
713 baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
714 git_config = ["-c", "credential.%s/.username=none" % baseurl,
715 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
720 logger.debug("trying %s", url)
721 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
722 env={"HOME": os.environ["HOME"],
723 "ARVADOS_API_TOKEN": api.api_token,
724 "GIT_ASKPASS": "/bin/false"})
725 except arvados.errors.CommandFailedError:
732 raise Exception('Cannot access git repository, tried {}'
735 if git_url.startswith("http:"):
736 if allow_insecure_http:
737 logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
739 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
741 return (git_url, git_config)
744 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
746 # Copies commits from git repository 'src_git_repo' on Arvados
747 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
748 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
751 # All commits will be copied to a destination branch named for the
752 # source repository URL.
754 # The destination repository must already exist.
756 # The user running this command must be authenticated
757 # to both repositories.
759 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
760 # Identify the fetch and push URLs for the git repositories.
762 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
763 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
765 logger.debug('src_git_url: {}'.format(src_git_url))
766 logger.debug('dst_git_url: {}'.format(dst_git_url))
768 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
770 # Copy git commits from src repo to dst repo.
771 if src_git_repo not in local_repo_dir:
772 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
773 arvados.util.run_command(
774 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
775 local_repo_dir[src_git_repo]],
776 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
777 env={"HOME": os.environ["HOME"],
778 "ARVADOS_API_TOKEN": src.api_token,
779 "GIT_ASKPASS": "/bin/false"})
780 arvados.util.run_command(
781 ["git", "remote", "add", "dst", dst_git_url],
782 cwd=local_repo_dir[src_git_repo])
783 arvados.util.run_command(
784 ["git", "branch", dst_branch, script_version],
785 cwd=local_repo_dir[src_git_repo])
786 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
787 cwd=local_repo_dir[src_git_repo],
788 env={"HOME": os.environ["HOME"],
789 "ARVADOS_API_TOKEN": dst.api_token,
790 "GIT_ASKPASS": "/bin/false"})
792 def copy_docker_images(pipeline, src, dst, args):
793 """Copy any docker images named in the pipeline components'
794 runtime_constraints field from src to dst."""
796 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
797 for c_name, c_info in pipeline['components'].iteritems():
798 if ('runtime_constraints' in c_info and
799 'docker_image' in c_info['runtime_constraints']):
801 c_info['runtime_constraints']['docker_image'],
802 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
806 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
807 """Copy the docker image identified by docker_image and
808 docker_image_tag from src to dst. Create appropriate
809 docker_image_repo+tag and docker_image_hash links at dst.
813 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
815 # Find the link identifying this docker image.
816 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
817 src, args.retries, docker_image, docker_image_tag)
818 if docker_image_list:
819 image_uuid, image_info = docker_image_list[0]
820 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
822 # Copy the collection it refers to.
823 dst_image_col = copy_collection(image_uuid, src, dst, args)
824 elif arvados.util.keep_locator_pattern.match(docker_image):
825 dst_image_col = copy_collection(docker_image, src, dst, args)
827 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
829 # git_rev_parse(rev, repo)
831 # Returns the 40-character commit hash corresponding to 'rev' in
832 # git repository 'repo' (which must be the path of a local git
835 def git_rev_parse(rev, repo):
836 gitout, giterr = arvados.util.run_command(
837 ['git', 'rev-parse', rev], cwd=repo)
838 return gitout.strip()
840 # uuid_type(api, object_uuid)
842 # Returns the name of the class that object_uuid belongs to, based on
843 # the second field of the uuid. This function consults the api's
844 # schema to identify the object class.
846 # It returns a string such as 'Collection', 'PipelineInstance', etc.
848 # Special case: if handed a Keep locator hash, return 'Collection'.
850 def uuid_type(api, object_uuid):
851 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
853 p = object_uuid.split('-')
856 for k in api._schema.schemas:
857 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
858 if type_prefix == obj_class:
862 def abort(msg, code=1):
863 logger.info("arv-copy: %s", msg)
867 # Code for reporting on the progress of a collection upload.
868 # Stolen from arvados.commands.put.ArvPutCollectionWriter
869 # TODO(twp): figure out how to refactor into a shared library
870 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
873 def machine_progress(obj_uuid, bytes_written, bytes_expected):
874 return "{} {}: {} {} written {} total\n".format(
879 -1 if (bytes_expected is None) else bytes_expected)
881 def human_progress(obj_uuid, bytes_written, bytes_expected):
883 return "\r{}: {}M / {}M {:.1%} ".format(
885 bytes_written >> 20, bytes_expected >> 20,
886 float(bytes_written) / bytes_expected)
888 return "\r{}: {} ".format(obj_uuid, bytes_written)
890 class ProgressWriter(object):
891 _progress_func = None
894 def __init__(self, progress_func):
895 self._progress_func = progress_func
897 def report(self, obj_uuid, bytes_written, bytes_expected):
898 if self._progress_func is not None:
900 self._progress_func(obj_uuid, bytes_written, bytes_expected))
903 self.outfile.write("\n")
905 if __name__ == '__main__':