3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
37 from arvados.api import OrderedJsonModel
39 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
41 logger = logging.getLogger('arvados.arv-copy')
43 # local_repo_dir records which git repositories from the Arvados source
44 # instance have been checked out locally during this run, and to which
46 # e.g. if repository 'twp' from src_arv has been cloned into
47 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
51 # List of collections that have been copied in this session, and their
52 # destination collection UUIDs.
53 collections_copied = {}
55 # Set of (repository, script_version) two-tuples of commits copied in git.
56 scripts_copied = set()
58 # The owner_uuid of the object being copied
62 copy_opts = argparse.ArgumentParser(add_help=False)
64 copy_opts.add_argument(
65 '-v', '--verbose', dest='verbose', action='store_true',
66 help='Verbose output.')
67 copy_opts.add_argument(
68 '--progress', dest='progress', action='store_true',
69 help='Report progress on copying collections. (default)')
70 copy_opts.add_argument(
71 '--no-progress', dest='progress', action='store_false',
72 help='Do not report progress on copying collections.')
73 copy_opts.add_argument(
74 '-f', '--force', dest='force', action='store_true',
75 help='Perform copy even if the object appears to exist at the remote destination.')
76 copy_opts.add_argument(
77 '--force-filters', action='store_true', default=False,
78 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
79 copy_opts.add_argument(
80 '--src', dest='source_arvados', required=True,
81 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
82 copy_opts.add_argument(
83 '--dst', dest='destination_arvados', required=True,
84 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
85 copy_opts.add_argument(
86 '--recursive', dest='recursive', action='store_true',
87 help='Recursively copy any dependencies for this object. (default)')
88 copy_opts.add_argument(
89 '--no-recursive', dest='recursive', action='store_false',
90 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
91 copy_opts.add_argument(
92 '--dst-git-repo', dest='dst_git_repo',
93 help='The name of the destination git repository. Required when copying a pipeline recursively.')
94 copy_opts.add_argument(
95 '--project-uuid', dest='project_uuid',
96 help='The UUID of the project at the destination to which the pipeline should be copied.')
97 copy_opts.add_argument(
98 '--allow-git-http-src', action="store_true",
99 help='Allow cloning git repositories over insecure http')
100 copy_opts.add_argument(
101 '--allow-git-http-dst', action="store_true",
102 help='Allow pushing git repositories over insecure http')
104 copy_opts.add_argument(
106 help='The UUID of the object to be copied.')
107 copy_opts.set_defaults(progress=True)
108 copy_opts.set_defaults(recursive=True)
110 parser = argparse.ArgumentParser(
111 description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
112 parents=[copy_opts, arv_cmd.retry_opt])
113 args = parser.parse_args()
116 logger.setLevel(logging.DEBUG)
118 logger.setLevel(logging.INFO)
120 # Create API clients for the source and destination instances
121 src_arv = api_for_instance(args.source_arvados)
122 dst_arv = api_for_instance(args.destination_arvados)
124 if not args.project_uuid:
125 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
127 # Identify the kind of object we have been given, and begin copying.
128 t = uuid_type(src_arv, args.object_uuid)
129 if t == 'Collection':
130 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
131 result = copy_collection(args.object_uuid,
134 elif t == 'PipelineInstance':
135 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
136 result = copy_pipeline_instance(args.object_uuid,
139 elif t == 'PipelineTemplate':
140 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
141 result = copy_pipeline_template(args.object_uuid,
142 src_arv, dst_arv, args)
144 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
146 # Clean up any outstanding temp git repositories.
147 for d in local_repo_dir.values():
148 shutil.rmtree(d, ignore_errors=True)
150 # If no exception was thrown and the response does not have an
151 # error_token field, presume success
152 if 'error_token' in result or 'uuid' not in result:
153 logger.error("API server returned an error result: {}".format(result))
157 logger.info("Success: created copy with uuid {}".format(result['uuid']))
160 def set_src_owner_uuid(resource, uuid, args):
161 global src_owner_uuid
162 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
163 src_owner_uuid = c.get("owner_uuid")
165 # api_for_instance(instance_name)
167 # Creates an API client for the Arvados instance identified by
170 # If instance_name contains a slash, it is presumed to be a path
171 # (either local or absolute) to a file with Arvados configuration
174 # Otherwise, it is presumed to be the name of a file in
175 # $HOME/.config/arvados/instance_name.conf
177 def api_for_instance(instance_name):
178 if '/' in instance_name:
179 config_file = instance_name
181 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
184 cfg = arvados.config.load(config_file)
185 except (IOError, OSError) as e:
186 abort(("Could not open config file {}: {}\n" +
187 "You must make sure that your configuration tokens\n" +
188 "for Arvados instance {} are in {} and that this\n" +
189 "file is readable.").format(
190 config_file, e, instance_name, config_file))
192 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
194 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
195 ['1', 't', 'true', 'y', 'yes']))
196 client = arvados.api('v1',
197 host=cfg['ARVADOS_API_HOST'],
198 token=cfg['ARVADOS_API_TOKEN'],
199 insecure=api_is_insecure,
200 model=OrderedJsonModel())
202 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
205 # Check if git is available
206 def check_git_availability():
208 arvados.util.run_command(['git', '--help'])
210 abort('git command is not available. Please ensure git is installed.')
212 # copy_pipeline_instance(pi_uuid, src, dst, args)
214 # Copies a pipeline instance identified by pi_uuid from src to dst.
216 # If the args.recursive option is set:
217 # 1. Copies all input collections
218 # * For each component in the pipeline, include all collections
219 # listed as job dependencies for that component)
220 # 2. Copy docker images
221 # 3. Copy git repositories
222 # 4. Copy the pipeline template
224 # The only changes made to the copied pipeline instance are:
225 # 1. The original pipeline instance UUID is preserved in
226 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
227 # 2. The pipeline_template_uuid is changed to the new template uuid.
228 # 3. The owner_uuid of the instance is changed to the user who
231 def copy_pipeline_instance(pi_uuid, src, dst, args):
232 # Fetch the pipeline instance record.
233 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
236 check_git_availability()
238 if not args.dst_git_repo:
239 abort('--dst-git-repo is required when copying a pipeline recursively.')
240 # Copy the pipeline template and save the copied template.
241 if pi.get('pipeline_template_uuid', None):
242 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
245 # Copy input collections, docker images and git repos.
246 pi = copy_collections(pi, src, dst, args)
247 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
248 copy_docker_images(pi, src, dst, args)
250 # Update the fields of the pipeline instance with the copied
252 if pi.get('pipeline_template_uuid', None):
253 pi['pipeline_template_uuid'] = pt['uuid']
257 logger.info("Copying only pipeline instance %s.", pi_uuid)
258 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
260 # Update the pipeline instance properties, and create the new
262 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
263 pi['description'] = "Pipeline copied from {}\n\n{}".format(
265 pi['description'] if pi.get('description', None) else '')
267 pi['owner_uuid'] = args.project_uuid
271 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
274 def filter_iter(arg):
275 """Iterate a filter string-or-list.
277 Pass in a filter field that can either be a string or list.
278 This will iterate elements as if the field had been written as a list.
280 if isinstance(arg, basestring):
285 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
286 """Update a single repository filter in-place for the destination.
288 If the filter checks that the repository is src_repository, it is
289 updated to check that the repository is dst_repository. If it does
290 anything else, this function raises ValueError.
292 if src_repository is None:
293 raise ValueError("component does not specify a source repository")
294 elif dst_repository is None:
295 raise ValueError("no destination repository specified to update repository filter")
296 elif repo_filter[1:] == ['=', src_repository]:
297 repo_filter[2] = dst_repository
298 elif repo_filter[1:] == ['in', [src_repository]]:
299 repo_filter[2] = [dst_repository]
301 raise ValueError("repository filter is not a simple source match")
303 def migrate_script_version_filter(version_filter):
304 """Update a single script_version filter in-place for the destination.
306 Currently this function checks that all the filter operands are Git
307 commit hashes. If they're not, it raises ValueError to indicate that
308 the filter is not portable. It could be extended to make other
309 transformations in the future.
311 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
312 raise ValueError("script_version filter is not limited to commit hashes")
314 def attr_filtered(filter_, *attr_names):
315 """Return True if filter_ applies to any of attr_names, else False."""
316 return any((name == 'any') or (name in attr_names)
317 for name in filter_iter(filter_[0]))
319 @contextlib.contextmanager
320 def exception_handler(handler, *exc_types):
321 """If any exc_types are raised in the block, call handler on the exception."""
324 except exc_types as error:
327 def migrate_components_filters(template_components, dst_git_repo):
328 """Update template component filters in-place for the destination.
330 template_components is a dictionary of components in a pipeline template.
331 This method walks over each component's filters, and updates them to have
332 identical semantics on the destination cluster. It returns a list of
333 error strings that describe what filters could not be updated safely.
335 dst_git_repo is the name of the destination Git repository, which can
336 be None if that is not known.
339 for cname, cspec in template_components.iteritems():
340 def add_error(errmsg):
341 errors.append("{}: {}".format(cname, errmsg))
342 if not isinstance(cspec, dict):
343 add_error("value is not a component definition")
345 src_repository = cspec.get('repository')
346 filters = cspec.get('filters', [])
347 if not isinstance(filters, list):
348 add_error("filters are not a list")
350 for cfilter in filters:
351 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
352 add_error("malformed filter {!r}".format(cfilter))
354 if attr_filtered(cfilter, 'repository'):
355 with exception_handler(add_error, ValueError):
356 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
357 if attr_filtered(cfilter, 'script_version'):
358 with exception_handler(add_error, ValueError):
359 migrate_script_version_filter(cfilter)
362 # copy_pipeline_template(pt_uuid, src, dst, args)
364 # Copies a pipeline template identified by pt_uuid from src to dst.
366 # If args.recursive is True, also copy any collections, docker
367 # images and git repositories that this template references.
369 # The owner_uuid of the new template is changed to that of the user
370 # who copied the template.
372 # Returns the copied pipeline template object.
374 def copy_pipeline_template(pt_uuid, src, dst, args):
375 # fetch the pipeline template from the source instance
376 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
378 if not args.force_filters:
379 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
381 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
382 "\n".join(filter_errors))
385 check_git_availability()
387 if not args.dst_git_repo:
388 abort('--dst-git-repo is required when copying a pipeline recursively.')
389 # Copy input collections, docker images and git repos.
390 pt = copy_collections(pt, src, dst, args)
391 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
392 copy_docker_images(pt, src, dst, args)
394 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
396 pt['description'] if pt.get('description', None) else '')
397 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
400 pt['owner_uuid'] = args.project_uuid
402 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
404 # copy_collections(obj, src, dst, args)
406 # Recursively copies all collections referenced by 'obj' from src
407 # to dst. obj may be a dict or a list, in which case we run
408 # copy_collections on every value it contains. If it is a string,
409 # search it for any substring that matches a collection hash or uuid
410 # (this will find hidden references to collections like
411 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
413 # Returns a copy of obj with any old collection uuids replaced by
416 def copy_collections(obj, src, dst, args):
418 def copy_collection_fn(collection_match):
419 """Helper function for regex substitution: copies a single collection,
420 identified by the collection_match MatchObject, to the
421 destination. Returns the destination collection uuid (or the
422 portable data hash if that's what src_id is).
425 src_id = collection_match.group(0)
426 if src_id not in collections_copied:
427 dst_col = copy_collection(src_id, src, dst, args)
428 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
429 collections_copied[src_id] = src_id
431 collections_copied[src_id] = dst_col['uuid']
432 return collections_copied[src_id]
434 if isinstance(obj, basestring):
435 # Copy any collections identified in this string to dst, replacing
436 # them with the dst uuids as necessary.
437 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
438 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
440 elif isinstance(obj, dict):
441 return type(obj)((v, copy_collections(obj[v], src, dst, args))
443 elif isinstance(obj, list):
444 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
447 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
448 """Copy a job's script to the destination repository, and update its record.
450 Given a jobspec dictionary, this function finds the referenced script from
451 src and copies it to dst and dst_repo. It also updates jobspec in place to
452 refer to names on the destination.
454 repo = jobspec.get('repository')
457 # script_version is the "script_version" parameter from the source
458 # component or job. If no script_version was supplied in the
459 # component or job, it is a mistake in the pipeline, but for the
460 # purposes of copying the repository, default to "master".
461 script_version = jobspec.get('script_version') or 'master'
462 script_key = (repo, script_version)
463 if script_key not in scripts_copied:
464 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
465 scripts_copied.add(script_key)
466 jobspec['repository'] = dst_repo
467 repo_dir = local_repo_dir[repo]
468 for version_key in ['script_version', 'supplied_script_version']:
469 if version_key in jobspec:
470 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
472 # copy_git_repos(p, src, dst, dst_repo, args)
474 # Copies all git repositories referenced by pipeline instance or
475 # template 'p' from src to dst.
477 # For each component c in the pipeline:
478 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
479 # * Rename script versions:
480 # * c['script_version']
481 # * c['job']['script_version']
482 # * c['job']['supplied_script_version']
483 # to the commit hashes they resolve to, since any symbolic
484 # names (tags, branches) are not preserved in the destination repo.
486 # The pipeline object is updated in place with the new repository
487 # names. The return value is undefined.
489 def copy_git_repos(p, src, dst, dst_repo, args):
490 for component in p['components'].itervalues():
491 migrate_jobspec(component, src, dst, dst_repo, args)
492 if 'job' in component:
493 migrate_jobspec(component['job'], src, dst, dst_repo, args)
495 def total_collection_size(manifest_text):
496 """Return the total number of bytes in this collection (excluding
497 duplicate blocks)."""
501 for line in manifest_text.splitlines():
503 for word in words[1:]:
505 loc = arvados.KeepLocator(word)
507 continue # this word isn't a locator, skip it
508 if loc.md5sum not in locators_seen:
509 locators_seen[loc.md5sum] = True
510 total_bytes += loc.size
514 def create_collection_from(c, src, dst, args):
515 """Create a new collection record on dst, and copy Docker metadata if
518 collection_uuid = c['uuid']
522 c['name'] = "copied from " + collection_uuid
524 if 'properties' in c:
527 c['owner_uuid'] = args.project_uuid
529 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
531 # Create docker_image_repo+tag and docker_image_hash links
532 # at the destination.
533 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
534 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
536 for src_link in docker_links:
537 body = {key: src_link[key]
538 for key in ['link_class', 'name', 'properties']}
539 body['head_uuid'] = dst_collection['uuid']
540 body['owner_uuid'] = args.project_uuid
542 lk = dst.links().create(body=body).execute(num_retries=args.retries)
543 logger.debug('created dst link {}'.format(lk))
545 return dst_collection
547 # copy_collection(obj_uuid, src, dst, args)
549 # Copies the collection identified by obj_uuid from src to dst.
550 # Returns the collection object created at dst.
552 # If args.progress is True, produce a human-friendly progress
555 # If a collection with the desired portable_data_hash already
556 # exists at dst, and args.force is False, copy_collection returns
557 # the existing collection without copying any blocks. Otherwise
558 # (if no collection exists or if args.force is True)
559 # copy_collection copies all of the collection data blocks from src
562 # For this application, it is critical to preserve the
563 # collection's manifest hash, which is not guaranteed with the
564 # arvados.CollectionReader and arvados.CollectionWriter classes.
565 # Copying each block in the collection manually, followed by
566 # the manifest block, ensures that the collection's manifest
567 # hash will not change.
569 def copy_collection(obj_uuid, src, dst, args):
570 if arvados.util.keep_locator_pattern.match(obj_uuid):
571 # If the obj_uuid is a portable data hash, it might not be uniquely
572 # identified with a particular collection. As a result, it is
573 # ambigious as to what name to use for the copy. Apply some heuristics
574 # to pick which collection to get the name from.
575 srccol = src.collections().list(
576 filters=[['portable_data_hash', '=', obj_uuid]],
577 order="created_at asc"
578 ).execute(num_retries=args.retries)
580 items = srccol.get("items")
583 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
589 # There's only one collection with the PDH, so use that.
592 # See if there is a collection that's in the same project
593 # as the root item (usually a pipeline) being copied.
595 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
599 # Didn't find any collections located in the same project, so
600 # pick the oldest collection that has a name assigned to it.
606 # None of the collections have names (?!), so just pick the
610 # list() doesn't return manifest text (and we don't want it to,
611 # because we don't need the same maninfest text sent to us 50
612 # times) so go and retrieve the collection object directly
613 # which will include the manifest text.
614 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
616 # Assume this is an actual collection uuid, so fetch it directly.
617 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
619 # If a collection with this hash already exists at the
620 # destination, and 'force' is not true, just return that
623 if 'portable_data_hash' in c:
624 colhash = c['portable_data_hash']
627 dstcol = dst.collections().list(
628 filters=[['portable_data_hash', '=', colhash]]
629 ).execute(num_retries=args.retries)
630 if dstcol['items_available'] > 0:
631 for d in dstcol['items']:
632 if ((args.project_uuid == d['owner_uuid']) and
633 (c.get('name') == d['name']) and
634 (c['portable_data_hash'] == d['portable_data_hash'])):
636 c['manifest_text'] = dst.collections().get(
637 uuid=dstcol['items'][0]['uuid']
638 ).execute(num_retries=args.retries)['manifest_text']
639 return create_collection_from(c, src, dst, args)
641 # Fetch the collection's manifest.
642 manifest = c['manifest_text']
643 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
645 # Copy each block from src_keep to dst_keep.
646 # Use the newly signed locators returned from dst_keep to build
647 # a new manifest as we go.
648 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
649 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
653 bytes_expected = total_collection_size(manifest)
655 progress_writer = ProgressWriter(human_progress)
657 progress_writer = None
659 for line in manifest.splitlines():
661 dst_manifest += words[0]
662 for word in words[1:]:
664 loc = arvados.KeepLocator(word)
666 # If 'word' can't be parsed as a locator,
667 # presume it's a filename.
668 dst_manifest += ' ' + word
670 blockhash = loc.md5sum
671 # copy this block if we haven't seen it before
672 # (otherwise, just reuse the existing dst_locator)
673 if blockhash not in dst_locators:
674 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
676 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
677 data = src_keep.get(word)
678 dst_locator = dst_keep.put(data)
679 dst_locators[blockhash] = dst_locator
680 bytes_written += loc.size
681 dst_manifest += ' ' + dst_locators[blockhash]
685 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
686 progress_writer.finish()
688 # Copy the manifest and save the collection.
689 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
691 c['manifest_text'] = dst_manifest
692 return create_collection_from(c, src, dst, args)
694 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
695 r = api.repositories().list(
696 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
697 if r['items_available'] != 1:
698 raise Exception('cannot identify repo {}; {} repos found'
699 .format(repo_name, r['items_available']))
701 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
702 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
703 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
705 priority = https_url + other_url + http_url
710 if url.startswith("http"):
711 u = urlparse.urlsplit(url)
712 baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
713 git_config = ["-c", "credential.%s/.username=none" % baseurl,
714 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
719 logger.debug("trying %s", url)
720 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
721 env={"HOME": os.environ["HOME"],
722 "ARVADOS_API_TOKEN": api.api_token,
723 "GIT_ASKPASS": "/bin/false"})
724 except arvados.errors.CommandFailedError:
731 raise Exception('Cannot access git repository, tried {}'
734 if git_url.startswith("http:"):
735 if allow_insecure_http:
736 logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
738 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
740 return (git_url, git_config)
743 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
745 # Copies commits from git repository 'src_git_repo' on Arvados
746 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
747 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
750 # All commits will be copied to a destination branch named for the
751 # source repository URL.
753 # The destination repository must already exist.
755 # The user running this command must be authenticated
756 # to both repositories.
758 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
759 # Identify the fetch and push URLs for the git repositories.
761 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
762 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
764 logger.debug('src_git_url: {}'.format(src_git_url))
765 logger.debug('dst_git_url: {}'.format(dst_git_url))
767 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
769 # Copy git commits from src repo to dst repo.
770 if src_git_repo not in local_repo_dir:
771 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
772 arvados.util.run_command(
773 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
774 local_repo_dir[src_git_repo]],
775 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
776 env={"HOME": os.environ["HOME"],
777 "ARVADOS_API_TOKEN": src.api_token,
778 "GIT_ASKPASS": "/bin/false"})
779 arvados.util.run_command(
780 ["git", "remote", "add", "dst", dst_git_url],
781 cwd=local_repo_dir[src_git_repo])
782 arvados.util.run_command(
783 ["git", "branch", dst_branch, script_version],
784 cwd=local_repo_dir[src_git_repo])
785 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
786 cwd=local_repo_dir[src_git_repo],
787 env={"HOME": os.environ["HOME"],
788 "ARVADOS_API_TOKEN": dst.api_token,
789 "GIT_ASKPASS": "/bin/false"})
791 def copy_docker_images(pipeline, src, dst, args):
792 """Copy any docker images named in the pipeline components'
793 runtime_constraints field from src to dst."""
795 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
796 for c_name, c_info in pipeline['components'].iteritems():
797 if ('runtime_constraints' in c_info and
798 'docker_image' in c_info['runtime_constraints']):
800 c_info['runtime_constraints']['docker_image'],
801 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
805 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
806 """Copy the docker image identified by docker_image and
807 docker_image_tag from src to dst. Create appropriate
808 docker_image_repo+tag and docker_image_hash links at dst.
812 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
814 # Find the link identifying this docker image.
815 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
816 src, args.retries, docker_image, docker_image_tag)
817 if docker_image_list:
818 image_uuid, image_info = docker_image_list[0]
819 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
821 # Copy the collection it refers to.
822 dst_image_col = copy_collection(image_uuid, src, dst, args)
823 elif arvados.util.keep_locator_pattern.match(docker_image):
824 dst_image_col = copy_collection(docker_image, src, dst, args)
826 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
828 # git_rev_parse(rev, repo)
830 # Returns the 40-character commit hash corresponding to 'rev' in
831 # git repository 'repo' (which must be the path of a local git
834 def git_rev_parse(rev, repo):
835 gitout, giterr = arvados.util.run_command(
836 ['git', 'rev-parse', rev], cwd=repo)
837 return gitout.strip()
839 # uuid_type(api, object_uuid)
841 # Returns the name of the class that object_uuid belongs to, based on
842 # the second field of the uuid. This function consults the api's
843 # schema to identify the object class.
845 # It returns a string such as 'Collection', 'PipelineInstance', etc.
847 # Special case: if handed a Keep locator hash, return 'Collection'.
849 def uuid_type(api, object_uuid):
850 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
852 p = object_uuid.split('-')
855 for k in api._schema.schemas:
856 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
857 if type_prefix == obj_class:
861 def abort(msg, code=1):
862 logger.info("arv-copy: %s", msg)
866 # Code for reporting on the progress of a collection upload.
867 # Stolen from arvados.commands.put.ArvPutCollectionWriter
868 # TODO(twp): figure out how to refactor into a shared library
869 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
872 def machine_progress(obj_uuid, bytes_written, bytes_expected):
873 return "{} {}: {} {} written {} total\n".format(
878 -1 if (bytes_expected is None) else bytes_expected)
880 def human_progress(obj_uuid, bytes_written, bytes_expected):
882 return "\r{}: {}M / {}M {:.1%} ".format(
884 bytes_written >> 20, bytes_expected >> 20,
885 float(bytes_written) / bytes_expected)
887 return "\r{}: {} ".format(obj_uuid, bytes_written)
889 class ProgressWriter(object):
890 _progress_func = None
893 def __init__(self, progress_func):
894 self._progress_func = progress_func
896 def report(self, obj_uuid, bytes_written, bytes_expected):
897 if self._progress_func is not None:
899 self._progress_func(obj_uuid, bytes_written, bytes_expected))
902 self.outfile.write("\n")
904 if __name__ == '__main__':