1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51 logger = logging.getLogger('arvados.arv-copy')
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
68 # The owner_uuid of the object being copied
72 copy_opts = argparse.ArgumentParser(add_help=False)
74 copy_opts.add_argument(
75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76 help='Print version and exit.')
77 copy_opts.add_argument(
78 '-v', '--verbose', dest='verbose', action='store_true',
79 help='Verbose output.')
80 copy_opts.add_argument(
81 '--progress', dest='progress', action='store_true',
82 help='Report progress on copying collections. (default)')
83 copy_opts.add_argument(
84 '--no-progress', dest='progress', action='store_false',
85 help='Do not report progress on copying collections.')
86 copy_opts.add_argument(
87 '-f', '--force', dest='force', action='store_true',
88 help='Perform copy even if the object appears to exist at the remote destination.')
89 copy_opts.add_argument(
90 '--src', dest='source_arvados', required=True,
91 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92 copy_opts.add_argument(
93 '--dst', dest='destination_arvados', required=True,
94 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--recursive', dest='recursive', action='store_true',
97 help='Recursively copy any dependencies for this object. (default)')
98 copy_opts.add_argument(
99 '--no-recursive', dest='recursive', action='store_false',
100 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101 copy_opts.add_argument(
102 '--project-uuid', dest='project_uuid',
103 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105 copy_opts.add_argument(
107 help='The UUID of the object to be copied.')
108 copy_opts.set_defaults(progress=True)
109 copy_opts.set_defaults(recursive=True)
111 parser = argparse.ArgumentParser(
112 description='Copy a workflow or collection from one Arvados instance to another.',
113 parents=[copy_opts, arv_cmd.retry_opt])
114 args = parser.parse_args()
117 logger.setLevel(logging.DEBUG)
119 logger.setLevel(logging.INFO)
121 # Create API clients for the source and destination instances
122 src_arv = api_for_instance(args.source_arvados)
123 dst_arv = api_for_instance(args.destination_arvados)
125 if not args.project_uuid:
126 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
128 # Identify the kind of object we have been given, and begin copying.
129 t = uuid_type(src_arv, args.object_uuid)
130 if t == 'Collection':
131 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
132 result = copy_collection(args.object_uuid,
135 elif t == 'Workflow':
136 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
137 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
139 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
141 # Clean up any outstanding temp git repositories.
142 for d in listvalues(local_repo_dir):
143 shutil.rmtree(d, ignore_errors=True)
145 # If no exception was thrown and the response does not have an
146 # error_token field, presume success
147 if 'error_token' in result or 'uuid' not in result:
148 logger.error("API server returned an error result: {}".format(result))
152 logger.info("Success: created copy with uuid {}".format(result['uuid']))
155 def set_src_owner_uuid(resource, uuid, args):
156 global src_owner_uuid
157 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
158 src_owner_uuid = c.get("owner_uuid")
160 # api_for_instance(instance_name)
162 # Creates an API client for the Arvados instance identified by
165 # If instance_name contains a slash, it is presumed to be a path
166 # (either local or absolute) to a file with Arvados configuration
169 # Otherwise, it is presumed to be the name of a file in
170 # $HOME/.config/arvados/instance_name.conf
172 def api_for_instance(instance_name):
173 if '/' in instance_name:
174 config_file = instance_name
176 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
179 cfg = arvados.config.load(config_file)
180 except (IOError, OSError) as e:
181 abort(("Could not open config file {}: {}\n" +
182 "You must make sure that your configuration tokens\n" +
183 "for Arvados instance {} are in {} and that this\n" +
184 "file is readable.").format(
185 config_file, e, instance_name, config_file))
187 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
189 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
190 ['1', 't', 'true', 'y', 'yes']))
191 client = arvados.api('v1',
192 host=cfg['ARVADOS_API_HOST'],
193 token=cfg['ARVADOS_API_TOKEN'],
194 insecure=api_is_insecure,
195 model=OrderedJsonModel())
197 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
200 # Check if git is available
201 def check_git_availability():
203 arvados.util.run_command(['git', '--help'])
205 abort('git command is not available. Please ensure git is installed.')
208 def filter_iter(arg):
209 """Iterate a filter string-or-list.
211 Pass in a filter field that can either be a string or list.
212 This will iterate elements as if the field had been written as a list.
214 if isinstance(arg, basestring):
219 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
220 """Update a single repository filter in-place for the destination.
222 If the filter checks that the repository is src_repository, it is
223 updated to check that the repository is dst_repository. If it does
224 anything else, this function raises ValueError.
226 if src_repository is None:
227 raise ValueError("component does not specify a source repository")
228 elif dst_repository is None:
229 raise ValueError("no destination repository specified to update repository filter")
230 elif repo_filter[1:] == ['=', src_repository]:
231 repo_filter[2] = dst_repository
232 elif repo_filter[1:] == ['in', [src_repository]]:
233 repo_filter[2] = [dst_repository]
235 raise ValueError("repository filter is not a simple source match")
237 def migrate_script_version_filter(version_filter):
238 """Update a single script_version filter in-place for the destination.
240 Currently this function checks that all the filter operands are Git
241 commit hashes. If they're not, it raises ValueError to indicate that
242 the filter is not portable. It could be extended to make other
243 transformations in the future.
245 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
246 raise ValueError("script_version filter is not limited to commit hashes")
248 def attr_filtered(filter_, *attr_names):
249 """Return True if filter_ applies to any of attr_names, else False."""
250 return any((name == 'any') or (name in attr_names)
251 for name in filter_iter(filter_[0]))
253 @contextlib.contextmanager
254 def exception_handler(handler, *exc_types):
255 """If any exc_types are raised in the block, call handler on the exception."""
258 except exc_types as error:
262 # copy_workflow(wf_uuid, src, dst, args)
264 # Copies a workflow identified by wf_uuid from src to dst.
266 # If args.recursive is True, also copy any collections
267 # referenced in the workflow definition yaml.
269 # The owner_uuid of the new workflow is set to any given
270 # project_uuid or the user who copied the template.
272 # Returns the copied workflow object.
274 def copy_workflow(wf_uuid, src, dst, args):
275 # fetch the workflow from the source instance
276 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
278 # copy collections and docker images
280 wf_def = yaml.safe_load(wf["definition"])
281 if wf_def is not None:
284 graph = wf_def.get('$graph', None)
285 if graph is not None:
286 workflow_collections(graph, locations, docker_images)
288 workflow_collections(wf_def, locations, docker_images)
291 copy_collections(locations, src, dst, args)
293 for image in docker_images:
294 copy_docker_image(image, docker_images[image], src, dst, args)
296 # copy the workflow itself
298 wf['owner_uuid'] = args.project_uuid
299 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
301 def workflow_collections(obj, locations, docker_images):
302 if isinstance(obj, dict):
303 loc = obj.get('location', None)
305 if loc.startswith("keep:"):
306 locations.append(loc[5:])
308 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
309 if docker_image is not None:
310 ds = docker_image.split(":", 1)
311 tag = ds[1] if len(ds)==2 else 'latest'
312 docker_images[ds[0]] = tag
315 workflow_collections(obj[x], locations, docker_images)
316 elif isinstance(obj, list):
318 workflow_collections(x, locations, docker_images)
320 # copy_collections(obj, src, dst, args)
322 # Recursively copies all collections referenced by 'obj' from src
323 # to dst. obj may be a dict or a list, in which case we run
324 # copy_collections on every value it contains. If it is a string,
325 # search it for any substring that matches a collection hash or uuid
326 # (this will find hidden references to collections like
327 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
329 # Returns a copy of obj with any old collection uuids replaced by
332 def copy_collections(obj, src, dst, args):
334 def copy_collection_fn(collection_match):
335 """Helper function for regex substitution: copies a single collection,
336 identified by the collection_match MatchObject, to the
337 destination. Returns the destination collection uuid (or the
338 portable data hash if that's what src_id is).
341 src_id = collection_match.group(0)
342 if src_id not in collections_copied:
343 dst_col = copy_collection(src_id, src, dst, args)
344 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
345 collections_copied[src_id] = src_id
347 collections_copied[src_id] = dst_col['uuid']
348 return collections_copied[src_id]
350 if isinstance(obj, basestring):
351 # Copy any collections identified in this string to dst, replacing
352 # them with the dst uuids as necessary.
353 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
354 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
356 elif isinstance(obj, dict):
357 return type(obj)((v, copy_collections(obj[v], src, dst, args))
359 elif isinstance(obj, list):
360 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
364 def total_collection_size(manifest_text):
365 """Return the total number of bytes in this collection (excluding
366 duplicate blocks)."""
370 for line in manifest_text.splitlines():
372 for word in words[1:]:
374 loc = arvados.KeepLocator(word)
376 continue # this word isn't a locator, skip it
377 if loc.md5sum not in locators_seen:
378 locators_seen[loc.md5sum] = True
379 total_bytes += loc.size
383 def create_collection_from(c, src, dst, args):
384 """Create a new collection record on dst, and copy Docker metadata if
387 collection_uuid = c['uuid']
389 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
393 body['name'] = "copied from " + collection_uuid
395 body['owner_uuid'] = args.project_uuid
397 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
399 # Create docker_image_repo+tag and docker_image_hash links
400 # at the destination.
401 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
402 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
404 for src_link in docker_links:
405 body = {key: src_link[key]
406 for key in ['link_class', 'name', 'properties']}
407 body['head_uuid'] = dst_collection['uuid']
408 body['owner_uuid'] = args.project_uuid
410 lk = dst.links().create(body=body).execute(num_retries=args.retries)
411 logger.debug('created dst link {}'.format(lk))
413 return dst_collection
415 # copy_collection(obj_uuid, src, dst, args)
417 # Copies the collection identified by obj_uuid from src to dst.
418 # Returns the collection object created at dst.
420 # If args.progress is True, produce a human-friendly progress
423 # If a collection with the desired portable_data_hash already
424 # exists at dst, and args.force is False, copy_collection returns
425 # the existing collection without copying any blocks. Otherwise
426 # (if no collection exists or if args.force is True)
427 # copy_collection copies all of the collection data blocks from src
430 # For this application, it is critical to preserve the
431 # collection's manifest hash, which is not guaranteed with the
432 # arvados.CollectionReader and arvados.CollectionWriter classes.
433 # Copying each block in the collection manually, followed by
434 # the manifest block, ensures that the collection's manifest
435 # hash will not change.
437 def copy_collection(obj_uuid, src, dst, args):
438 if arvados.util.keep_locator_pattern.match(obj_uuid):
439 # If the obj_uuid is a portable data hash, it might not be
440 # uniquely identified with a particular collection. As a
441 # result, it is ambiguous as to what name to use for the copy.
442 # Apply some heuristics to pick which collection to get the
444 srccol = src.collections().list(
445 filters=[['portable_data_hash', '=', obj_uuid]],
446 order="created_at asc"
447 ).execute(num_retries=args.retries)
449 items = srccol.get("items")
452 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
458 # There's only one collection with the PDH, so use that.
461 # See if there is a collection that's in the same project
462 # as the root item (usually a workflow) being copied.
464 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
468 # Didn't find any collections located in the same project, so
469 # pick the oldest collection that has a name assigned to it.
475 # None of the collections have names (?!), so just pick the
479 # list() doesn't return manifest text (and we don't want it to,
480 # because we don't need the same maninfest text sent to us 50
481 # times) so go and retrieve the collection object directly
482 # which will include the manifest text.
483 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
485 # Assume this is an actual collection uuid, so fetch it directly.
486 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
488 # If a collection with this hash already exists at the
489 # destination, and 'force' is not true, just return that
492 if 'portable_data_hash' in c:
493 colhash = c['portable_data_hash']
496 dstcol = dst.collections().list(
497 filters=[['portable_data_hash', '=', colhash]]
498 ).execute(num_retries=args.retries)
499 if dstcol['items_available'] > 0:
500 for d in dstcol['items']:
501 if ((args.project_uuid == d['owner_uuid']) and
502 (c.get('name') == d['name']) and
503 (c['portable_data_hash'] == d['portable_data_hash'])):
505 c['manifest_text'] = dst.collections().get(
506 uuid=dstcol['items'][0]['uuid']
507 ).execute(num_retries=args.retries)['manifest_text']
508 return create_collection_from(c, src, dst, args)
510 # Fetch the collection's manifest.
511 manifest = c['manifest_text']
512 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
514 # Copy each block from src_keep to dst_keep.
515 # Use the newly signed locators returned from dst_keep to build
516 # a new manifest as we go.
517 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
518 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
522 bytes_expected = total_collection_size(manifest)
524 progress_writer = ProgressWriter(human_progress)
526 progress_writer = None
528 for line in manifest.splitlines():
530 dst_manifest += words[0]
531 for word in words[1:]:
533 loc = arvados.KeepLocator(word)
535 # If 'word' can't be parsed as a locator,
536 # presume it's a filename.
537 dst_manifest += ' ' + word
539 blockhash = loc.md5sum
540 # copy this block if we haven't seen it before
541 # (otherwise, just reuse the existing dst_locator)
542 if blockhash not in dst_locators:
543 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
545 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
546 data = src_keep.get(word)
547 dst_locator = dst_keep.put(data)
548 dst_locators[blockhash] = dst_locator
549 bytes_written += loc.size
550 dst_manifest += ' ' + dst_locators[blockhash]
554 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
555 progress_writer.finish()
557 # Copy the manifest and save the collection.
558 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
560 c['manifest_text'] = dst_manifest
561 return create_collection_from(c, src, dst, args)
563 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
564 r = api.repositories().list(
565 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
566 if r['items_available'] != 1:
567 raise Exception('cannot identify repo {}; {} repos found'
568 .format(repo_name, r['items_available']))
570 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
571 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
572 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
574 priority = https_url + other_url + http_url
579 if url.startswith("http"):
580 u = urllib.parse.urlsplit(url)
581 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
582 git_config = ["-c", "credential.%s/.username=none" % baseurl,
583 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
588 logger.debug("trying %s", url)
589 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
590 env={"HOME": os.environ["HOME"],
591 "ARVADOS_API_TOKEN": api.api_token,
592 "GIT_ASKPASS": "/bin/false"})
593 except arvados.errors.CommandFailedError:
600 raise Exception('Cannot access git repository, tried {}'
603 if git_url.startswith("http:"):
604 if allow_insecure_http:
605 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
607 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
609 return (git_url, git_config)
612 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
613 """Copy the docker image identified by docker_image and
614 docker_image_tag from src to dst. Create appropriate
615 docker_image_repo+tag and docker_image_hash links at dst.
619 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
621 # Find the link identifying this docker image.
622 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
623 src, args.retries, docker_image, docker_image_tag)
624 if docker_image_list:
625 image_uuid, image_info = docker_image_list[0]
626 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
628 # Copy the collection it refers to.
629 dst_image_col = copy_collection(image_uuid, src, dst, args)
630 elif arvados.util.keep_locator_pattern.match(docker_image):
631 dst_image_col = copy_collection(docker_image, src, dst, args)
633 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
635 # git_rev_parse(rev, repo)
637 # Returns the 40-character commit hash corresponding to 'rev' in
638 # git repository 'repo' (which must be the path of a local git
641 def git_rev_parse(rev, repo):
642 gitout, giterr = arvados.util.run_command(
643 ['git', 'rev-parse', rev], cwd=repo)
644 return gitout.strip()
646 # uuid_type(api, object_uuid)
648 # Returns the name of the class that object_uuid belongs to, based on
649 # the second field of the uuid. This function consults the api's
650 # schema to identify the object class.
652 # It returns a string such as 'Collection', 'Workflow', etc.
654 # Special case: if handed a Keep locator hash, return 'Collection'.
656 def uuid_type(api, object_uuid):
657 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
659 p = object_uuid.split('-')
662 for k in api._schema.schemas:
663 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
664 if type_prefix == obj_class:
668 def abort(msg, code=1):
669 logger.info("arv-copy: %s", msg)
673 # Code for reporting on the progress of a collection upload.
674 # Stolen from arvados.commands.put.ArvPutCollectionWriter
675 # TODO(twp): figure out how to refactor into a shared library
676 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
679 def machine_progress(obj_uuid, bytes_written, bytes_expected):
680 return "{} {}: {} {} written {} total\n".format(
685 -1 if (bytes_expected is None) else bytes_expected)
687 def human_progress(obj_uuid, bytes_written, bytes_expected):
689 return "\r{}: {}M / {}M {:.1%} ".format(
691 bytes_written >> 20, bytes_expected >> 20,
692 float(bytes_written) / bytes_expected)
694 return "\r{}: {} ".format(obj_uuid, bytes_written)
696 class ProgressWriter(object):
697 _progress_func = None
700 def __init__(self, progress_func):
701 self._progress_func = progress_func
703 def report(self, obj_uuid, bytes_written, bytes_expected):
704 if self._progress_func is not None:
706 self._progress_func(obj_uuid, bytes_written, bytes_expected))
709 self.outfile.write("\n")
711 if __name__ == '__main__':