3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
19 from __future__ import division
20 from future import standard_library
21 standard_library.install_aliases()
22 from past.builtins import basestring
23 from builtins import object
39 import arvados.commands._util as arv_cmd
40 import arvados.commands.keepdocker
41 import ruamel.yaml as yaml
43 from arvados.api import OrderedJsonModel
44 from arvados._version import __version__
46 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
48 logger = logging.getLogger('arvados.arv-copy')
50 # local_repo_dir records which git repositories from the Arvados source
51 # instance have been checked out locally during this run, and to which
53 # e.g. if repository 'twp' from src_arv has been cloned into
54 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 # List of collections that have been copied in this session, and their
59 # destination collection UUIDs.
60 collections_copied = {}
62 # Set of (repository, script_version) two-tuples of commits copied in git.
63 scripts_copied = set()
65 # The owner_uuid of the object being copied
69 copy_opts = argparse.ArgumentParser(add_help=False)
71 copy_opts.add_argument(
72 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
73 help='Print version and exit.')
74 copy_opts.add_argument(
75 '-v', '--verbose', dest='verbose', action='store_true',
76 help='Verbose output.')
77 copy_opts.add_argument(
78 '--progress', dest='progress', action='store_true',
79 help='Report progress on copying collections. (default)')
80 copy_opts.add_argument(
81 '--no-progress', dest='progress', action='store_false',
82 help='Do not report progress on copying collections.')
83 copy_opts.add_argument(
84 '-f', '--force', dest='force', action='store_true',
85 help='Perform copy even if the object appears to exist at the remote destination.')
86 copy_opts.add_argument(
87 '--force-filters', action='store_true', default=False,
88 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
89 copy_opts.add_argument(
90 '--src', dest='source_arvados', required=True,
91 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92 copy_opts.add_argument(
93 '--dst', dest='destination_arvados', required=True,
94 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--recursive', dest='recursive', action='store_true',
97 help='Recursively copy any dependencies for this object. (default)')
98 copy_opts.add_argument(
99 '--no-recursive', dest='recursive', action='store_false',
100 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101 copy_opts.add_argument(
102 '--dst-git-repo', dest='dst_git_repo',
103 help='The name of the destination git repository. Required when copying a pipeline recursively.')
104 copy_opts.add_argument(
105 '--project-uuid', dest='project_uuid',
106 help='The UUID of the project at the destination to which the pipeline should be copied.')
107 copy_opts.add_argument(
108 '--allow-git-http-src', action="store_true",
109 help='Allow cloning git repositories over insecure http')
110 copy_opts.add_argument(
111 '--allow-git-http-dst', action="store_true",
112 help='Allow pushing git repositories over insecure http')
114 copy_opts.add_argument(
116 help='The UUID of the object to be copied.')
117 copy_opts.set_defaults(progress=True)
118 copy_opts.set_defaults(recursive=True)
120 parser = argparse.ArgumentParser(
121 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
122 parents=[copy_opts, arv_cmd.retry_opt])
123 args = parser.parse_args()
126 logger.setLevel(logging.DEBUG)
128 logger.setLevel(logging.INFO)
130 # Create API clients for the source and destination instances
131 src_arv = api_for_instance(args.source_arvados)
132 dst_arv = api_for_instance(args.destination_arvados)
134 if not args.project_uuid:
135 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137 # Identify the kind of object we have been given, and begin copying.
138 t = uuid_type(src_arv, args.object_uuid)
139 if t == 'Collection':
140 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
141 result = copy_collection(args.object_uuid,
144 elif t == 'PipelineInstance':
145 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
146 result = copy_pipeline_instance(args.object_uuid,
149 elif t == 'PipelineTemplate':
150 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
151 result = copy_pipeline_template(args.object_uuid,
152 src_arv, dst_arv, args)
153 elif t == 'Workflow':
154 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
155 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
157 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
159 # Clean up any outstanding temp git repositories.
160 for d in list(local_repo_dir.values()):
161 shutil.rmtree(d, ignore_errors=True)
163 # If no exception was thrown and the response does not have an
164 # error_token field, presume success
165 if 'error_token' in result or 'uuid' not in result:
166 logger.error("API server returned an error result: {}".format(result))
170 logger.info("Success: created copy with uuid {}".format(result['uuid']))
173 def set_src_owner_uuid(resource, uuid, args):
174 global src_owner_uuid
175 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
176 src_owner_uuid = c.get("owner_uuid")
178 # api_for_instance(instance_name)
180 # Creates an API client for the Arvados instance identified by
183 # If instance_name contains a slash, it is presumed to be a path
184 # (either local or absolute) to a file with Arvados configuration
187 # Otherwise, it is presumed to be the name of a file in
188 # $HOME/.config/arvados/instance_name.conf
190 def api_for_instance(instance_name):
191 if '/' in instance_name:
192 config_file = instance_name
194 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
197 cfg = arvados.config.load(config_file)
198 except (IOError, OSError) as e:
199 abort(("Could not open config file {}: {}\n" +
200 "You must make sure that your configuration tokens\n" +
201 "for Arvados instance {} are in {} and that this\n" +
202 "file is readable.").format(
203 config_file, e, instance_name, config_file))
205 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
207 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
208 ['1', 't', 'true', 'y', 'yes']))
209 client = arvados.api('v1',
210 host=cfg['ARVADOS_API_HOST'],
211 token=cfg['ARVADOS_API_TOKEN'],
212 insecure=api_is_insecure,
213 model=OrderedJsonModel())
215 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
218 # Check if git is available
219 def check_git_availability():
221 arvados.util.run_command(['git', '--help'])
223 abort('git command is not available. Please ensure git is installed.')
225 # copy_pipeline_instance(pi_uuid, src, dst, args)
227 # Copies a pipeline instance identified by pi_uuid from src to dst.
229 # If the args.recursive option is set:
230 # 1. Copies all input collections
231 # * For each component in the pipeline, include all collections
232 # listed as job dependencies for that component)
233 # 2. Copy docker images
234 # 3. Copy git repositories
235 # 4. Copy the pipeline template
237 # The only changes made to the copied pipeline instance are:
238 # 1. The original pipeline instance UUID is preserved in
239 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
240 # 2. The pipeline_template_uuid is changed to the new template uuid.
241 # 3. The owner_uuid of the instance is changed to the user who
244 def copy_pipeline_instance(pi_uuid, src, dst, args):
245 # Fetch the pipeline instance record.
246 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
249 check_git_availability()
251 if not args.dst_git_repo:
252 abort('--dst-git-repo is required when copying a pipeline recursively.')
253 # Copy the pipeline template and save the copied template.
254 if pi.get('pipeline_template_uuid', None):
255 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
258 # Copy input collections, docker images and git repos.
259 pi = copy_collections(pi, src, dst, args)
260 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
261 copy_docker_images(pi, src, dst, args)
263 # Update the fields of the pipeline instance with the copied
265 if pi.get('pipeline_template_uuid', None):
266 pi['pipeline_template_uuid'] = pt['uuid']
270 logger.info("Copying only pipeline instance %s.", pi_uuid)
271 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
273 # Update the pipeline instance properties, and create the new
275 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
276 pi['description'] = "Pipeline copied from {}\n\n{}".format(
278 pi['description'] if pi.get('description', None) else '')
280 pi['owner_uuid'] = args.project_uuid
284 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
287 def filter_iter(arg):
288 """Iterate a filter string-or-list.
290 Pass in a filter field that can either be a string or list.
291 This will iterate elements as if the field had been written as a list.
293 if isinstance(arg, basestring):
298 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
299 """Update a single repository filter in-place for the destination.
301 If the filter checks that the repository is src_repository, it is
302 updated to check that the repository is dst_repository. If it does
303 anything else, this function raises ValueError.
305 if src_repository is None:
306 raise ValueError("component does not specify a source repository")
307 elif dst_repository is None:
308 raise ValueError("no destination repository specified to update repository filter")
309 elif repo_filter[1:] == ['=', src_repository]:
310 repo_filter[2] = dst_repository
311 elif repo_filter[1:] == ['in', [src_repository]]:
312 repo_filter[2] = [dst_repository]
314 raise ValueError("repository filter is not a simple source match")
316 def migrate_script_version_filter(version_filter):
317 """Update a single script_version filter in-place for the destination.
319 Currently this function checks that all the filter operands are Git
320 commit hashes. If they're not, it raises ValueError to indicate that
321 the filter is not portable. It could be extended to make other
322 transformations in the future.
324 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
325 raise ValueError("script_version filter is not limited to commit hashes")
327 def attr_filtered(filter_, *attr_names):
328 """Return True if filter_ applies to any of attr_names, else False."""
329 return any((name == 'any') or (name in attr_names)
330 for name in filter_iter(filter_[0]))
332 @contextlib.contextmanager
333 def exception_handler(handler, *exc_types):
334 """If any exc_types are raised in the block, call handler on the exception."""
337 except exc_types as error:
340 def migrate_components_filters(template_components, dst_git_repo):
341 """Update template component filters in-place for the destination.
343 template_components is a dictionary of components in a pipeline template.
344 This method walks over each component's filters, and updates them to have
345 identical semantics on the destination cluster. It returns a list of
346 error strings that describe what filters could not be updated safely.
348 dst_git_repo is the name of the destination Git repository, which can
349 be None if that is not known.
352 for cname, cspec in template_components.items():
353 def add_error(errmsg):
354 errors.append("{}: {}".format(cname, errmsg))
355 if not isinstance(cspec, dict):
356 add_error("value is not a component definition")
358 src_repository = cspec.get('repository')
359 filters = cspec.get('filters', [])
360 if not isinstance(filters, list):
361 add_error("filters are not a list")
363 for cfilter in filters:
364 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
365 add_error("malformed filter {!r}".format(cfilter))
367 if attr_filtered(cfilter, 'repository'):
368 with exception_handler(add_error, ValueError):
369 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
370 if attr_filtered(cfilter, 'script_version'):
371 with exception_handler(add_error, ValueError):
372 migrate_script_version_filter(cfilter)
375 # copy_pipeline_template(pt_uuid, src, dst, args)
377 # Copies a pipeline template identified by pt_uuid from src to dst.
379 # If args.recursive is True, also copy any collections, docker
380 # images and git repositories that this template references.
382 # The owner_uuid of the new template is changed to that of the user
383 # who copied the template.
385 # Returns the copied pipeline template object.
387 def copy_pipeline_template(pt_uuid, src, dst, args):
388 # fetch the pipeline template from the source instance
389 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
391 if not args.force_filters:
392 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
394 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
395 "\n".join(filter_errors))
398 check_git_availability()
400 if not args.dst_git_repo:
401 abort('--dst-git-repo is required when copying a pipeline recursively.')
402 # Copy input collections, docker images and git repos.
403 pt = copy_collections(pt, src, dst, args)
404 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
405 copy_docker_images(pt, src, dst, args)
407 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
409 pt['description'] if pt.get('description', None) else '')
410 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
413 pt['owner_uuid'] = args.project_uuid
415 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
417 # copy_workflow(wf_uuid, src, dst, args)
419 # Copies a workflow identified by wf_uuid from src to dst.
421 # If args.recursive is True, also copy any collections
422 # referenced in the workflow definition yaml.
424 # The owner_uuid of the new workflow is set to any given
425 # project_uuid or the user who copied the template.
427 # Returns the copied workflow object.
429 def copy_workflow(wf_uuid, src, dst, args):
430 # fetch the workflow from the source instance
431 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
433 # copy collections and docker images
435 wf_def = yaml.safe_load(wf["definition"])
436 if wf_def is not None:
439 graph = wf_def.get('$graph', None)
440 if graph is not None:
441 workflow_collections(graph, locations, docker_images)
443 workflow_collections(wf_def, locations, docker_images)
446 copy_collections(locations, src, dst, args)
448 for image in docker_images:
449 copy_docker_image(image, docker_images[image], src, dst, args)
451 # copy the workflow itself
453 wf['owner_uuid'] = args.project_uuid
454 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
456 def workflow_collections(obj, locations, docker_images):
457 if isinstance(obj, dict):
458 loc = obj.get('location', None)
460 if loc.startswith("keep:"):
461 locations.append(loc[5:])
463 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
464 if docker_image is not None:
465 ds = docker_image.split(":", 1)
466 tag = ds[1] if len(ds)==2 else 'latest'
467 docker_images[ds[0]] = tag
470 workflow_collections(obj[x], locations, docker_images)
471 elif isinstance(obj, list):
473 workflow_collections(x, locations, docker_images)
475 # copy_collections(obj, src, dst, args)
477 # Recursively copies all collections referenced by 'obj' from src
478 # to dst. obj may be a dict or a list, in which case we run
479 # copy_collections on every value it contains. If it is a string,
480 # search it for any substring that matches a collection hash or uuid
481 # (this will find hidden references to collections like
482 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
484 # Returns a copy of obj with any old collection uuids replaced by
487 def copy_collections(obj, src, dst, args):
489 def copy_collection_fn(collection_match):
490 """Helper function for regex substitution: copies a single collection,
491 identified by the collection_match MatchObject, to the
492 destination. Returns the destination collection uuid (or the
493 portable data hash if that's what src_id is).
496 src_id = collection_match.group(0)
497 if src_id not in collections_copied:
498 dst_col = copy_collection(src_id, src, dst, args)
499 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
500 collections_copied[src_id] = src_id
502 collections_copied[src_id] = dst_col['uuid']
503 return collections_copied[src_id]
505 if isinstance(obj, basestring):
506 # Copy any collections identified in this string to dst, replacing
507 # them with the dst uuids as necessary.
508 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
509 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
511 elif isinstance(obj, dict):
512 return type(obj)((v, copy_collections(obj[v], src, dst, args))
514 elif isinstance(obj, list):
515 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
518 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
519 """Copy a job's script to the destination repository, and update its record.
521 Given a jobspec dictionary, this function finds the referenced script from
522 src and copies it to dst and dst_repo. It also updates jobspec in place to
523 refer to names on the destination.
525 repo = jobspec.get('repository')
528 # script_version is the "script_version" parameter from the source
529 # component or job. If no script_version was supplied in the
530 # component or job, it is a mistake in the pipeline, but for the
531 # purposes of copying the repository, default to "master".
532 script_version = jobspec.get('script_version') or 'master'
533 script_key = (repo, script_version)
534 if script_key not in scripts_copied:
535 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
536 scripts_copied.add(script_key)
537 jobspec['repository'] = dst_repo
538 repo_dir = local_repo_dir[repo]
539 for version_key in ['script_version', 'supplied_script_version']:
540 if version_key in jobspec:
541 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
543 # copy_git_repos(p, src, dst, dst_repo, args)
545 # Copies all git repositories referenced by pipeline instance or
546 # template 'p' from src to dst.
548 # For each component c in the pipeline:
549 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
550 # * Rename script versions:
551 # * c['script_version']
552 # * c['job']['script_version']
553 # * c['job']['supplied_script_version']
554 # to the commit hashes they resolve to, since any symbolic
555 # names (tags, branches) are not preserved in the destination repo.
557 # The pipeline object is updated in place with the new repository
558 # names. The return value is undefined.
560 def copy_git_repos(p, src, dst, dst_repo, args):
561 for component in p['components'].values():
562 migrate_jobspec(component, src, dst, dst_repo, args)
563 if 'job' in component:
564 migrate_jobspec(component['job'], src, dst, dst_repo, args)
566 def total_collection_size(manifest_text):
567 """Return the total number of bytes in this collection (excluding
568 duplicate blocks)."""
572 for line in manifest_text.splitlines():
574 for word in words[1:]:
576 loc = arvados.KeepLocator(word)
578 continue # this word isn't a locator, skip it
579 if loc.md5sum not in locators_seen:
580 locators_seen[loc.md5sum] = True
581 total_bytes += loc.size
585 def create_collection_from(c, src, dst, args):
586 """Create a new collection record on dst, and copy Docker metadata if
589 collection_uuid = c['uuid']
593 c['name'] = "copied from " + collection_uuid
595 if 'properties' in c:
598 c['owner_uuid'] = args.project_uuid
600 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
602 # Create docker_image_repo+tag and docker_image_hash links
603 # at the destination.
604 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
605 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
607 for src_link in docker_links:
608 body = {key: src_link[key]
609 for key in ['link_class', 'name', 'properties']}
610 body['head_uuid'] = dst_collection['uuid']
611 body['owner_uuid'] = args.project_uuid
613 lk = dst.links().create(body=body).execute(num_retries=args.retries)
614 logger.debug('created dst link {}'.format(lk))
616 return dst_collection
618 # copy_collection(obj_uuid, src, dst, args)
620 # Copies the collection identified by obj_uuid from src to dst.
621 # Returns the collection object created at dst.
623 # If args.progress is True, produce a human-friendly progress
626 # If a collection with the desired portable_data_hash already
627 # exists at dst, and args.force is False, copy_collection returns
628 # the existing collection without copying any blocks. Otherwise
629 # (if no collection exists or if args.force is True)
630 # copy_collection copies all of the collection data blocks from src
633 # For this application, it is critical to preserve the
634 # collection's manifest hash, which is not guaranteed with the
635 # arvados.CollectionReader and arvados.CollectionWriter classes.
636 # Copying each block in the collection manually, followed by
637 # the manifest block, ensures that the collection's manifest
638 # hash will not change.
640 def copy_collection(obj_uuid, src, dst, args):
641 if arvados.util.keep_locator_pattern.match(obj_uuid):
642 # If the obj_uuid is a portable data hash, it might not be uniquely
643 # identified with a particular collection. As a result, it is
644 # ambigious as to what name to use for the copy. Apply some heuristics
645 # to pick which collection to get the name from.
646 srccol = src.collections().list(
647 filters=[['portable_data_hash', '=', obj_uuid]],
648 order="created_at asc"
649 ).execute(num_retries=args.retries)
651 items = srccol.get("items")
654 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
660 # There's only one collection with the PDH, so use that.
663 # See if there is a collection that's in the same project
664 # as the root item (usually a pipeline) being copied.
666 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
670 # Didn't find any collections located in the same project, so
671 # pick the oldest collection that has a name assigned to it.
677 # None of the collections have names (?!), so just pick the
681 # list() doesn't return manifest text (and we don't want it to,
682 # because we don't need the same maninfest text sent to us 50
683 # times) so go and retrieve the collection object directly
684 # which will include the manifest text.
685 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
687 # Assume this is an actual collection uuid, so fetch it directly.
688 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
690 # If a collection with this hash already exists at the
691 # destination, and 'force' is not true, just return that
694 if 'portable_data_hash' in c:
695 colhash = c['portable_data_hash']
698 dstcol = dst.collections().list(
699 filters=[['portable_data_hash', '=', colhash]]
700 ).execute(num_retries=args.retries)
701 if dstcol['items_available'] > 0:
702 for d in dstcol['items']:
703 if ((args.project_uuid == d['owner_uuid']) and
704 (c.get('name') == d['name']) and
705 (c['portable_data_hash'] == d['portable_data_hash'])):
707 c['manifest_text'] = dst.collections().get(
708 uuid=dstcol['items'][0]['uuid']
709 ).execute(num_retries=args.retries)['manifest_text']
710 return create_collection_from(c, src, dst, args)
712 # Fetch the collection's manifest.
713 manifest = c['manifest_text']
714 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
716 # Copy each block from src_keep to dst_keep.
717 # Use the newly signed locators returned from dst_keep to build
718 # a new manifest as we go.
719 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
720 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
724 bytes_expected = total_collection_size(manifest)
726 progress_writer = ProgressWriter(human_progress)
728 progress_writer = None
730 for line in manifest.splitlines():
732 dst_manifest += words[0]
733 for word in words[1:]:
735 loc = arvados.KeepLocator(word)
737 # If 'word' can't be parsed as a locator,
738 # presume it's a filename.
739 dst_manifest += ' ' + word
741 blockhash = loc.md5sum
742 # copy this block if we haven't seen it before
743 # (otherwise, just reuse the existing dst_locator)
744 if blockhash not in dst_locators:
745 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
747 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
748 data = src_keep.get(word)
749 dst_locator = dst_keep.put(data)
750 dst_locators[blockhash] = dst_locator
751 bytes_written += loc.size
752 dst_manifest += ' ' + dst_locators[blockhash]
756 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
757 progress_writer.finish()
759 # Copy the manifest and save the collection.
760 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
762 c['manifest_text'] = dst_manifest
763 return create_collection_from(c, src, dst, args)
765 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
766 r = api.repositories().list(
767 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
768 if r['items_available'] != 1:
769 raise Exception('cannot identify repo {}; {} repos found'
770 .format(repo_name, r['items_available']))
772 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
773 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
774 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
776 priority = https_url + other_url + http_url
781 if url.startswith("http"):
782 u = urllib.parse.urlsplit(url)
783 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
784 git_config = ["-c", "credential.%s/.username=none" % baseurl,
785 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
790 logger.debug("trying %s", url)
791 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
792 env={"HOME": os.environ["HOME"],
793 "ARVADOS_API_TOKEN": api.api_token,
794 "GIT_ASKPASS": "/bin/false"})
795 except arvados.errors.CommandFailedError:
802 raise Exception('Cannot access git repository, tried {}'
805 if git_url.startswith("http:"):
806 if allow_insecure_http:
807 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
809 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
811 return (git_url, git_config)
814 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
816 # Copies commits from git repository 'src_git_repo' on Arvados
817 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
818 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
821 # All commits will be copied to a destination branch named for the
822 # source repository URL.
824 # The destination repository must already exist.
826 # The user running this command must be authenticated
827 # to both repositories.
829 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
830 # Identify the fetch and push URLs for the git repositories.
832 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
833 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
835 logger.debug('src_git_url: {}'.format(src_git_url))
836 logger.debug('dst_git_url: {}'.format(dst_git_url))
838 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
840 # Copy git commits from src repo to dst repo.
841 if src_git_repo not in local_repo_dir:
842 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
843 arvados.util.run_command(
844 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
845 local_repo_dir[src_git_repo]],
846 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
847 env={"HOME": os.environ["HOME"],
848 "ARVADOS_API_TOKEN": src.api_token,
849 "GIT_ASKPASS": "/bin/false"})
850 arvados.util.run_command(
851 ["git", "remote", "add", "dst", dst_git_url],
852 cwd=local_repo_dir[src_git_repo])
853 arvados.util.run_command(
854 ["git", "branch", dst_branch, script_version],
855 cwd=local_repo_dir[src_git_repo])
856 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
857 cwd=local_repo_dir[src_git_repo],
858 env={"HOME": os.environ["HOME"],
859 "ARVADOS_API_TOKEN": dst.api_token,
860 "GIT_ASKPASS": "/bin/false"})
862 def copy_docker_images(pipeline, src, dst, args):
863 """Copy any docker images named in the pipeline components'
864 runtime_constraints field from src to dst."""
866 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
867 for c_name, c_info in pipeline['components'].items():
868 if ('runtime_constraints' in c_info and
869 'docker_image' in c_info['runtime_constraints']):
871 c_info['runtime_constraints']['docker_image'],
872 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
876 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
877 """Copy the docker image identified by docker_image and
878 docker_image_tag from src to dst. Create appropriate
879 docker_image_repo+tag and docker_image_hash links at dst.
883 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
885 # Find the link identifying this docker image.
886 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
887 src, args.retries, docker_image, docker_image_tag)
888 if docker_image_list:
889 image_uuid, image_info = docker_image_list[0]
890 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
892 # Copy the collection it refers to.
893 dst_image_col = copy_collection(image_uuid, src, dst, args)
894 elif arvados.util.keep_locator_pattern.match(docker_image):
895 dst_image_col = copy_collection(docker_image, src, dst, args)
897 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
899 # git_rev_parse(rev, repo)
901 # Returns the 40-character commit hash corresponding to 'rev' in
902 # git repository 'repo' (which must be the path of a local git
905 def git_rev_parse(rev, repo):
906 gitout, giterr = arvados.util.run_command(
907 ['git', 'rev-parse', rev], cwd=repo)
908 return gitout.strip()
910 # uuid_type(api, object_uuid)
912 # Returns the name of the class that object_uuid belongs to, based on
913 # the second field of the uuid. This function consults the api's
914 # schema to identify the object class.
916 # It returns a string such as 'Collection', 'PipelineInstance', etc.
918 # Special case: if handed a Keep locator hash, return 'Collection'.
920 def uuid_type(api, object_uuid):
921 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
923 p = object_uuid.split('-')
926 for k in api._schema.schemas:
927 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
928 if type_prefix == obj_class:
932 def abort(msg, code=1):
933 logger.info("arv-copy: %s", msg)
937 # Code for reporting on the progress of a collection upload.
938 # Stolen from arvados.commands.put.ArvPutCollectionWriter
939 # TODO(twp): figure out how to refactor into a shared library
940 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
943 def machine_progress(obj_uuid, bytes_written, bytes_expected):
944 return "{} {}: {} {} written {} total\n".format(
949 -1 if (bytes_expected is None) else bytes_expected)
951 def human_progress(obj_uuid, bytes_written, bytes_expected):
953 return "\r{}: {}M / {}M {:.1%} ".format(
955 bytes_written >> 20, bytes_expected >> 20,
956 float(bytes_written) / bytes_expected)
958 return "\r{}: {} ".format(obj_uuid, bytes_written)
960 class ProgressWriter(object):
961 _progress_func = None
964 def __init__(self, progress_func):
965 self._progress_func = progress_func
967 def report(self, obj_uuid, bytes_written, bytes_expected):
968 if self._progress_func is not None:
970 self._progress_func(obj_uuid, bytes_written, bytes_expected))
973 self.outfile.write("\n")
975 if __name__ == '__main__':