1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
52 from arvados._version import __version__
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
56 logger = logging.getLogger('arvados.arv-copy')
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
73 # The owner_uuid of the object being copied
77 copy_opts = argparse.ArgumentParser(add_help=False)
79 copy_opts.add_argument(
80 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81 help='Print version and exit.')
82 copy_opts.add_argument(
83 '-v', '--verbose', dest='verbose', action='store_true',
84 help='Verbose output.')
85 copy_opts.add_argument(
86 '--progress', dest='progress', action='store_true',
87 help='Report progress on copying collections. (default)')
88 copy_opts.add_argument(
89 '--no-progress', dest='progress', action='store_false',
90 help='Do not report progress on copying collections.')
91 copy_opts.add_argument(
92 '-f', '--force', dest='force', action='store_true',
93 help='Perform copy even if the object appears to exist at the remote destination.')
94 copy_opts.add_argument(
95 '--src', dest='source_arvados',
96 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
97 copy_opts.add_argument(
98 '--dst', dest='destination_arvados',
99 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
100 copy_opts.add_argument(
101 '--recursive', dest='recursive', action='store_true',
102 help='Recursively copy any dependencies for this object, and subprojects. (default)')
103 copy_opts.add_argument(
104 '--no-recursive', dest='recursive', action='store_false',
105 help='Do not copy any dependencies or subprojects.')
106 copy_opts.add_argument(
107 '--project-uuid', dest='project_uuid',
108 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109 copy_opts.add_argument(
110 '--storage-classes', dest='storage_classes',
111 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112 copy_opts.add_argument("--varying-url-params", type=str, default="",
113 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
115 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
118 copy_opts.add_argument(
120 help='The UUID of the object to be copied.')
121 copy_opts.set_defaults(progress=True)
122 copy_opts.set_defaults(recursive=True)
124 parser = argparse.ArgumentParser(
125 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
126 parents=[copy_opts, arv_cmd.retry_opt])
127 args = parser.parse_args()
129 if args.storage_classes:
130 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
133 logger.setLevel(logging.DEBUG)
135 logger.setLevel(logging.INFO)
137 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138 args.source_arvados = args.object_uuid[:5]
140 # Create API clients for the source and destination instances
141 src_arv = api_for_instance(args.source_arvados, args.retries)
142 dst_arv = api_for_instance(args.destination_arvados, args.retries)
144 if not args.project_uuid:
145 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
147 # Identify the kind of object we have been given, and begin copying.
148 t = uuid_type(src_arv, args.object_uuid)
151 if t == 'Collection':
152 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
153 result = copy_collection(args.object_uuid,
156 elif t == 'Workflow':
157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
160 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
161 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
163 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
165 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
166 except Exception as e:
167 logger.error("%s", e, exc_info=args.verbose)
170 # Clean up any outstanding temp git repositories.
171 for d in listvalues(local_repo_dir):
172 shutil.rmtree(d, ignore_errors=True)
174 # If no exception was thrown and the response does not have an
175 # error_token field, presume success
176 if result is None or 'error_token' in result or 'uuid' not in result:
178 logger.error("API server returned an error result: {}".format(result))
181 print(result['uuid'])
183 if result.get('partial_error'):
184 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
187 logger.info("Success: created copy with uuid {}".format(result['uuid']))
190 def set_src_owner_uuid(resource, uuid, args):
191 global src_owner_uuid
192 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
193 src_owner_uuid = c.get("owner_uuid")
195 # api_for_instance(instance_name)
197 # Creates an API client for the Arvados instance identified by
200 # If instance_name contains a slash, it is presumed to be a path
201 # (either local or absolute) to a file with Arvados configuration
204 # Otherwise, it is presumed to be the name of a file in
205 # $HOME/.config/arvados/instance_name.conf
207 def api_for_instance(instance_name, num_retries):
208 if not instance_name:
210 return arvados.api('v1')
212 if '/' in instance_name:
213 config_file = instance_name
215 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
218 cfg = arvados.config.load(config_file)
219 except (IOError, OSError) as e:
220 abort(("Could not open config file {}: {}\n" +
221 "You must make sure that your configuration tokens\n" +
222 "for Arvados instance {} are in {} and that this\n" +
223 "file is readable.").format(
224 config_file, e, instance_name, config_file))
226 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
228 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
229 ['1', 't', 'true', 'y', 'yes']))
230 client = arvados.api('v1',
231 host=cfg['ARVADOS_API_HOST'],
232 token=cfg['ARVADOS_API_TOKEN'],
233 insecure=api_is_insecure,
234 num_retries=num_retries,
237 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
240 # Check if git is available
241 def check_git_availability():
244 ['git', '--version'],
246 stdout=subprocess.DEVNULL,
248 except FileNotFoundError:
249 abort('git command is not available. Please ensure git is installed.')
252 def filter_iter(arg):
253 """Iterate a filter string-or-list.
255 Pass in a filter field that can either be a string or list.
256 This will iterate elements as if the field had been written as a list.
258 if isinstance(arg, basestring):
263 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
264 """Update a single repository filter in-place for the destination.
266 If the filter checks that the repository is src_repository, it is
267 updated to check that the repository is dst_repository. If it does
268 anything else, this function raises ValueError.
270 if src_repository is None:
271 raise ValueError("component does not specify a source repository")
272 elif dst_repository is None:
273 raise ValueError("no destination repository specified to update repository filter")
274 elif repo_filter[1:] == ['=', src_repository]:
275 repo_filter[2] = dst_repository
276 elif repo_filter[1:] == ['in', [src_repository]]:
277 repo_filter[2] = [dst_repository]
279 raise ValueError("repository filter is not a simple source match")
281 def migrate_script_version_filter(version_filter):
282 """Update a single script_version filter in-place for the destination.
284 Currently this function checks that all the filter operands are Git
285 commit hashes. If they're not, it raises ValueError to indicate that
286 the filter is not portable. It could be extended to make other
287 transformations in the future.
289 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
290 raise ValueError("script_version filter is not limited to commit hashes")
292 def attr_filtered(filter_, *attr_names):
293 """Return True if filter_ applies to any of attr_names, else False."""
294 return any((name == 'any') or (name in attr_names)
295 for name in filter_iter(filter_[0]))
297 @contextlib.contextmanager
298 def exception_handler(handler, *exc_types):
299 """If any exc_types are raised in the block, call handler on the exception."""
302 except exc_types as error:
306 # copy_workflow(wf_uuid, src, dst, args)
308 # Copies a workflow identified by wf_uuid from src to dst.
310 # If args.recursive is True, also copy any collections
311 # referenced in the workflow definition yaml.
313 # The owner_uuid of the new workflow is set to any given
314 # project_uuid or the user who copied the template.
316 # Returns the copied workflow object.
318 def copy_workflow(wf_uuid, src, dst, args):
319 # fetch the workflow from the source instance
320 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
322 if not wf["definition"]:
323 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
325 # copy collections and docker images
326 if args.recursive and wf["definition"]:
327 wf_def = yaml.safe_load(wf["definition"])
328 if wf_def is not None:
331 graph = wf_def.get('$graph', None)
332 if graph is not None:
333 workflow_collections(graph, locations, docker_images)
335 workflow_collections(wf_def, locations, docker_images)
338 copy_collections(locations, src, dst, args)
340 for image in docker_images:
341 copy_docker_image(image, docker_images[image], src, dst, args)
343 # copy the workflow itself
345 wf['owner_uuid'] = args.project_uuid
347 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
348 ["name", "=", wf["name"]]]).execute()
349 if len(existing["items"]) == 0:
350 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
352 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
355 def workflow_collections(obj, locations, docker_images):
356 if isinstance(obj, dict):
357 loc = obj.get('location', None)
359 if loc.startswith("keep:"):
360 locations.append(loc[5:])
362 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
363 if docker_image is not None:
364 ds = docker_image.split(":", 1)
365 tag = ds[1] if len(ds)==2 else 'latest'
366 docker_images[ds[0]] = tag
369 workflow_collections(obj[x], locations, docker_images)
370 elif isinstance(obj, list):
372 workflow_collections(x, locations, docker_images)
374 # copy_collections(obj, src, dst, args)
376 # Recursively copies all collections referenced by 'obj' from src
377 # to dst. obj may be a dict or a list, in which case we run
378 # copy_collections on every value it contains. If it is a string,
379 # search it for any substring that matches a collection hash or uuid
380 # (this will find hidden references to collections like
381 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
383 # Returns a copy of obj with any old collection uuids replaced by
386 def copy_collections(obj, src, dst, args):
388 def copy_collection_fn(collection_match):
389 """Helper function for regex substitution: copies a single collection,
390 identified by the collection_match MatchObject, to the
391 destination. Returns the destination collection uuid (or the
392 portable data hash if that's what src_id is).
395 src_id = collection_match.group(0)
396 if src_id not in collections_copied:
397 dst_col = copy_collection(src_id, src, dst, args)
398 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
399 collections_copied[src_id] = src_id
401 collections_copied[src_id] = dst_col['uuid']
402 return collections_copied[src_id]
404 if isinstance(obj, basestring):
405 # Copy any collections identified in this string to dst, replacing
406 # them with the dst uuids as necessary.
407 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
408 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
410 elif isinstance(obj, dict):
411 return type(obj)((v, copy_collections(obj[v], src, dst, args))
413 elif isinstance(obj, list):
414 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
418 def total_collection_size(manifest_text):
419 """Return the total number of bytes in this collection (excluding
420 duplicate blocks)."""
424 for line in manifest_text.splitlines():
426 for word in words[1:]:
428 loc = arvados.KeepLocator(word)
430 continue # this word isn't a locator, skip it
431 if loc.md5sum not in locators_seen:
432 locators_seen[loc.md5sum] = True
433 total_bytes += loc.size
437 def create_collection_from(c, src, dst, args):
438 """Create a new collection record on dst, and copy Docker metadata if
441 collection_uuid = c['uuid']
443 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
447 body['name'] = "copied from " + collection_uuid
449 if args.storage_classes:
450 body['storage_classes_desired'] = args.storage_classes
452 body['owner_uuid'] = args.project_uuid
454 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
456 # Create docker_image_repo+tag and docker_image_hash links
457 # at the destination.
458 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
459 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
461 for src_link in docker_links:
462 body = {key: src_link[key]
463 for key in ['link_class', 'name', 'properties']}
464 body['head_uuid'] = dst_collection['uuid']
465 body['owner_uuid'] = args.project_uuid
467 lk = dst.links().create(body=body).execute(num_retries=args.retries)
468 logger.debug('created dst link {}'.format(lk))
470 return dst_collection
472 # copy_collection(obj_uuid, src, dst, args)
474 # Copies the collection identified by obj_uuid from src to dst.
475 # Returns the collection object created at dst.
477 # If args.progress is True, produce a human-friendly progress
480 # If a collection with the desired portable_data_hash already
481 # exists at dst, and args.force is False, copy_collection returns
482 # the existing collection without copying any blocks. Otherwise
483 # (if no collection exists or if args.force is True)
484 # copy_collection copies all of the collection data blocks from src
487 # For this application, it is critical to preserve the
488 # collection's manifest hash, which is not guaranteed with the
489 # arvados.CollectionReader and arvados.CollectionWriter classes.
490 # Copying each block in the collection manually, followed by
491 # the manifest block, ensures that the collection's manifest
492 # hash will not change.
494 def copy_collection(obj_uuid, src, dst, args):
495 if arvados.util.keep_locator_pattern.match(obj_uuid):
496 # If the obj_uuid is a portable data hash, it might not be
497 # uniquely identified with a particular collection. As a
498 # result, it is ambiguous as to what name to use for the copy.
499 # Apply some heuristics to pick which collection to get the
501 srccol = src.collections().list(
502 filters=[['portable_data_hash', '=', obj_uuid]],
503 order="created_at asc"
504 ).execute(num_retries=args.retries)
506 items = srccol.get("items")
509 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
515 # There's only one collection with the PDH, so use that.
518 # See if there is a collection that's in the same project
519 # as the root item (usually a workflow) being copied.
521 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
525 # Didn't find any collections located in the same project, so
526 # pick the oldest collection that has a name assigned to it.
532 # None of the collections have names (?!), so just pick the
536 # list() doesn't return manifest text (and we don't want it to,
537 # because we don't need the same maninfest text sent to us 50
538 # times) so go and retrieve the collection object directly
539 # which will include the manifest text.
540 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
542 # Assume this is an actual collection uuid, so fetch it directly.
543 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
545 # If a collection with this hash already exists at the
546 # destination, and 'force' is not true, just return that
549 if 'portable_data_hash' in c:
550 colhash = c['portable_data_hash']
553 dstcol = dst.collections().list(
554 filters=[['portable_data_hash', '=', colhash]]
555 ).execute(num_retries=args.retries)
556 if dstcol['items_available'] > 0:
557 for d in dstcol['items']:
558 if ((args.project_uuid == d['owner_uuid']) and
559 (c.get('name') == d['name']) and
560 (c['portable_data_hash'] == d['portable_data_hash'])):
562 c['manifest_text'] = dst.collections().get(
563 uuid=dstcol['items'][0]['uuid']
564 ).execute(num_retries=args.retries)['manifest_text']
565 return create_collection_from(c, src, dst, args)
567 # Fetch the collection's manifest.
568 manifest = c['manifest_text']
569 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
571 # Copy each block from src_keep to dst_keep.
572 # Use the newly signed locators returned from dst_keep to build
573 # a new manifest as we go.
574 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
575 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
576 dst_manifest = io.StringIO()
579 bytes_expected = total_collection_size(manifest)
581 progress_writer = ProgressWriter(human_progress)
583 progress_writer = None
585 # go through the words
586 # put each block loc into 'get' queue
587 # 'get' threads get block and put it into 'put' queue
588 # 'put' threads put block and then update dst_locators
590 # after going through the whole manifest we go back through it
591 # again and build dst_manifest
593 lock = threading.Lock()
595 # the get queue should be unbounded because we'll add all the
596 # block hashes we want to get, but these are small
597 get_queue = queue.Queue()
601 # the put queue contains full data blocks
602 # and if 'get' is faster than 'put' we could end up consuming
603 # a great deal of RAM if it isn't bounded.
604 put_queue = queue.Queue(threadcount)
609 word = get_queue.get()
612 get_queue.task_done()
615 blockhash = arvados.KeepLocator(word).md5sum
617 if blockhash in dst_locators:
619 get_queue.task_done()
623 logger.debug("Getting block %s", word)
624 data = src_keep.get(word)
625 put_queue.put((word, data))
627 logger.error("Error getting block %s: %s", word, e)
628 transfer_error.append(e)
630 # Drain the 'get' queue so we end early
633 get_queue.task_done()
637 get_queue.task_done()
640 nonlocal bytes_written
642 item = put_queue.get()
644 put_queue.task_done()
648 loc = arvados.KeepLocator(word)
649 blockhash = loc.md5sum
651 if blockhash in dst_locators:
653 put_queue.task_done()
657 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
658 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
660 dst_locators[blockhash] = dst_locator
661 bytes_written += loc.size
663 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
665 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
667 # Drain the 'get' queue so we end early
670 get_queue.task_done()
673 transfer_error.append(e)
675 put_queue.task_done()
677 for line in manifest.splitlines():
679 for word in words[1:]:
681 loc = arvados.KeepLocator(word)
683 # If 'word' can't be parsed as a locator,
684 # presume it's a filename.
689 for i in range(0, threadcount):
692 for i in range(0, threadcount):
693 threading.Thread(target=get_thread, daemon=True).start()
695 for i in range(0, threadcount):
696 threading.Thread(target=put_thread, daemon=True).start()
701 if len(transfer_error) > 0:
702 return {"error_token": "Failed to transfer blocks"}
704 for line in manifest.splitlines():
706 dst_manifest.write(words[0])
707 for word in words[1:]:
709 loc = arvados.KeepLocator(word)
711 # If 'word' can't be parsed as a locator,
712 # presume it's a filename.
713 dst_manifest.write(' ')
714 dst_manifest.write(word)
716 blockhash = loc.md5sum
717 dst_manifest.write(' ')
718 dst_manifest.write(dst_locators[blockhash])
719 dst_manifest.write("\n")
722 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
723 progress_writer.finish()
725 # Copy the manifest and save the collection.
726 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
728 c['manifest_text'] = dst_manifest.getvalue()
729 return create_collection_from(c, src, dst, args)
731 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
732 r = api.repositories().list(
733 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
734 if r['items_available'] != 1:
735 raise Exception('cannot identify repo {}; {} repos found'
736 .format(repo_name, r['items_available']))
738 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
739 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
740 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
742 priority = https_url + other_url + http_url
745 if url.startswith("http"):
746 u = urllib.parse.urlsplit(url)
747 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
748 git_config = ["-c", "credential.%s/.username=none" % baseurl,
749 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
754 logger.debug("trying %s", url)
756 ['git', *git_config, 'ls-remote', url],
759 'ARVADOS_API_TOKEN': api.api_token,
760 'GIT_ASKPASS': '/bin/false',
761 'HOME': os.environ['HOME'],
763 stdout=subprocess.DEVNULL,
765 except subprocess.CalledProcessError:
771 raise Exception('Cannot access git repository, tried {}'
774 if git_url.startswith("http:"):
775 if allow_insecure_http:
776 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
778 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
780 return (git_url, git_config)
783 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
784 """Copy the docker image identified by docker_image and
785 docker_image_tag from src to dst. Create appropriate
786 docker_image_repo+tag and docker_image_hash links at dst.
790 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
792 # Find the link identifying this docker image.
793 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
794 src, args.retries, docker_image, docker_image_tag)
795 if docker_image_list:
796 image_uuid, image_info = docker_image_list[0]
797 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
799 # Copy the collection it refers to.
800 dst_image_col = copy_collection(image_uuid, src, dst, args)
801 elif arvados.util.keep_locator_pattern.match(docker_image):
802 dst_image_col = copy_collection(docker_image, src, dst, args)
804 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
806 def copy_project(obj_uuid, src, dst, owner_uuid, args):
808 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
810 # Create/update the destination project
811 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
812 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
813 if len(existing["items"]) == 0:
814 project_record = dst.groups().create(body={"group": {"group_class": "project",
815 "owner_uuid": owner_uuid,
816 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
818 project_record = existing["items"][0]
820 dst.groups().update(uuid=project_record["uuid"],
822 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
824 args.project_uuid = project_record["uuid"]
826 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
833 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
835 except Exception as e:
836 partial_error += "\n" + str(e)
839 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
841 copy_workflow(w["uuid"], src, dst, args)
842 except Exception as e:
843 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
846 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
848 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
849 except Exception as e:
850 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
852 project_record["partial_error"] = partial_error
854 return project_record
856 # git_rev_parse(rev, repo)
858 # Returns the 40-character commit hash corresponding to 'rev' in
859 # git repository 'repo' (which must be the path of a local git
862 def git_rev_parse(rev, repo):
863 proc = subprocess.run(
864 ['git', 'rev-parse', rev],
867 stdout=subprocess.PIPE,
870 return proc.stdout.read().strip()
872 # uuid_type(api, object_uuid)
874 # Returns the name of the class that object_uuid belongs to, based on
875 # the second field of the uuid. This function consults the api's
876 # schema to identify the object class.
878 # It returns a string such as 'Collection', 'Workflow', etc.
880 # Special case: if handed a Keep locator hash, return 'Collection'.
882 def uuid_type(api, object_uuid):
883 if re.match(arvados.util.keep_locator_pattern, object_uuid):
886 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
889 p = object_uuid.split('-')
892 for k in api._schema.schemas:
893 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
894 if type_prefix == obj_class:
899 def copy_from_http(url, src, dst, args):
901 project_uuid = args.project_uuid
902 varying_url_params = args.varying_url_params
903 prefer_cached_downloads = args.prefer_cached_downloads
905 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
906 varying_url_params=varying_url_params,
907 prefer_cached_downloads=prefer_cached_downloads)
908 if cached[2] is not None:
909 return copy_collection(cached[2], src, dst, args)
911 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
912 varying_url_params=varying_url_params,
913 prefer_cached_downloads=prefer_cached_downloads)
915 if cached is not None:
916 return {"uuid": cached[2]}
919 def abort(msg, code=1):
920 logger.info("arv-copy: %s", msg)
924 # Code for reporting on the progress of a collection upload.
925 # Stolen from arvados.commands.put.ArvPutCollectionWriter
926 # TODO(twp): figure out how to refactor into a shared library
927 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
930 def machine_progress(obj_uuid, bytes_written, bytes_expected):
931 return "{} {}: {} {} written {} total\n".format(
936 -1 if (bytes_expected is None) else bytes_expected)
938 def human_progress(obj_uuid, bytes_written, bytes_expected):
940 return "\r{}: {}M / {}M {:.1%} ".format(
942 bytes_written >> 20, bytes_expected >> 20,
943 float(bytes_written) / bytes_expected)
945 return "\r{}: {} ".format(obj_uuid, bytes_written)
947 class ProgressWriter(object):
948 _progress_func = None
951 def __init__(self, progress_func):
952 self._progress_func = progress_func
954 def report(self, obj_uuid, bytes_written, bytes_expected):
955 if self._progress_func is not None:
957 self._progress_func(obj_uuid, bytes_written, bytes_expected))
960 self.outfile.write("\n")
962 if __name__ == '__main__':