1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
52 from arvados._version import __version__
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
56 logger = logging.getLogger('arvados.arv-copy')
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
73 # The owner_uuid of the object being copied
77 copy_opts = argparse.ArgumentParser(add_help=False)
79 copy_opts.add_argument(
80 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81 help='Print version and exit.')
82 copy_opts.add_argument(
83 '-v', '--verbose', dest='verbose', action='store_true',
84 help='Verbose output.')
85 copy_opts.add_argument(
86 '--progress', dest='progress', action='store_true',
87 help='Report progress on copying collections. (default)')
88 copy_opts.add_argument(
89 '--no-progress', dest='progress', action='store_false',
90 help='Do not report progress on copying collections.')
91 copy_opts.add_argument(
92 '-f', '--force', dest='force', action='store_true',
93 help='Perform copy even if the object appears to exist at the remote destination.')
94 copy_opts.add_argument(
95 '--src', dest='source_arvados',
96 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
97 copy_opts.add_argument(
98 '--dst', dest='destination_arvados',
99 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
100 copy_opts.add_argument(
101 '--recursive', dest='recursive', action='store_true',
102 help='Recursively copy any dependencies for this object, and subprojects. (default)')
103 copy_opts.add_argument(
104 '--no-recursive', dest='recursive', action='store_false',
105 help='Do not copy any dependencies or subprojects.')
106 copy_opts.add_argument(
107 '--project-uuid', dest='project_uuid',
108 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109 copy_opts.add_argument(
110 '--storage-classes', dest='storage_classes',
111 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112 copy_opts.add_argument("--varying-url-params", type=str, default="",
113 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
115 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
118 copy_opts.add_argument(
120 help='The UUID of the object to be copied.')
121 copy_opts.set_defaults(progress=True)
122 copy_opts.set_defaults(recursive=True)
124 parser = argparse.ArgumentParser(
125 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
126 parents=[copy_opts, arv_cmd.retry_opt])
127 args = parser.parse_args()
129 if args.storage_classes:
130 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
133 logger.setLevel(logging.DEBUG)
135 logger.setLevel(logging.INFO)
137 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138 args.source_arvados = args.object_uuid[:5]
140 # Create API clients for the source and destination instances
141 src_arv = api_for_instance(args.source_arvados, args.retries)
142 dst_arv = api_for_instance(args.destination_arvados, args.retries)
144 if not args.project_uuid:
145 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
147 # Identify the kind of object we have been given, and begin copying.
148 t = uuid_type(src_arv, args.object_uuid)
151 if t == 'Collection':
152 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
153 result = copy_collection(args.object_uuid,
156 elif t == 'Workflow':
157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
160 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
161 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
163 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
165 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
166 except Exception as e:
167 logger.error("%s", e, exc_info=args.verbose)
170 # Clean up any outstanding temp git repositories.
171 for d in listvalues(local_repo_dir):
172 shutil.rmtree(d, ignore_errors=True)
177 # If no exception was thrown and the response does not have an
178 # error_token field, presume success
179 if result is None or 'error_token' in result or 'uuid' not in result:
181 logger.error("API server returned an error result: {}".format(result))
184 print(result['uuid'])
186 if result.get('partial_error'):
187 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
190 logger.info("Success: created copy with uuid {}".format(result['uuid']))
193 def set_src_owner_uuid(resource, uuid, args):
194 global src_owner_uuid
195 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
196 src_owner_uuid = c.get("owner_uuid")
198 # api_for_instance(instance_name)
200 # Creates an API client for the Arvados instance identified by
203 # If instance_name contains a slash, it is presumed to be a path
204 # (either local or absolute) to a file with Arvados configuration
207 # Otherwise, it is presumed to be the name of a file in
208 # $HOME/.config/arvados/instance_name.conf
210 def api_for_instance(instance_name, num_retries):
211 if not instance_name:
213 return arvados.api('v1')
215 if '/' in instance_name:
216 config_file = instance_name
218 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
221 cfg = arvados.config.load(config_file)
222 except (IOError, OSError) as e:
223 abort(("Could not open config file {}: {}\n" +
224 "You must make sure that your configuration tokens\n" +
225 "for Arvados instance {} are in {} and that this\n" +
226 "file is readable.").format(
227 config_file, e, instance_name, config_file))
229 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
231 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
232 ['1', 't', 'true', 'y', 'yes']))
233 client = arvados.api('v1',
234 host=cfg['ARVADOS_API_HOST'],
235 token=cfg['ARVADOS_API_TOKEN'],
236 insecure=api_is_insecure,
237 num_retries=num_retries,
240 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
243 # Check if git is available
244 def check_git_availability():
247 ['git', '--version'],
249 stdout=subprocess.DEVNULL,
251 except FileNotFoundError:
252 abort('git command is not available. Please ensure git is installed.')
255 def filter_iter(arg):
256 """Iterate a filter string-or-list.
258 Pass in a filter field that can either be a string or list.
259 This will iterate elements as if the field had been written as a list.
261 if isinstance(arg, basestring):
266 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
267 """Update a single repository filter in-place for the destination.
269 If the filter checks that the repository is src_repository, it is
270 updated to check that the repository is dst_repository. If it does
271 anything else, this function raises ValueError.
273 if src_repository is None:
274 raise ValueError("component does not specify a source repository")
275 elif dst_repository is None:
276 raise ValueError("no destination repository specified to update repository filter")
277 elif repo_filter[1:] == ['=', src_repository]:
278 repo_filter[2] = dst_repository
279 elif repo_filter[1:] == ['in', [src_repository]]:
280 repo_filter[2] = [dst_repository]
282 raise ValueError("repository filter is not a simple source match")
284 def migrate_script_version_filter(version_filter):
285 """Update a single script_version filter in-place for the destination.
287 Currently this function checks that all the filter operands are Git
288 commit hashes. If they're not, it raises ValueError to indicate that
289 the filter is not portable. It could be extended to make other
290 transformations in the future.
292 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
293 raise ValueError("script_version filter is not limited to commit hashes")
295 def attr_filtered(filter_, *attr_names):
296 """Return True if filter_ applies to any of attr_names, else False."""
297 return any((name == 'any') or (name in attr_names)
298 for name in filter_iter(filter_[0]))
300 @contextlib.contextmanager
301 def exception_handler(handler, *exc_types):
302 """If any exc_types are raised in the block, call handler on the exception."""
305 except exc_types as error:
309 # copy_workflow(wf_uuid, src, dst, args)
311 # Copies a workflow identified by wf_uuid from src to dst.
313 # If args.recursive is True, also copy any collections
314 # referenced in the workflow definition yaml.
316 # The owner_uuid of the new workflow is set to any given
317 # project_uuid or the user who copied the template.
319 # Returns the copied workflow object.
321 def copy_workflow(wf_uuid, src, dst, args):
322 # fetch the workflow from the source instance
323 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
325 if not wf["definition"]:
326 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
328 # copy collections and docker images
329 if args.recursive and wf["definition"]:
330 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
331 "ARVADOS_API_TOKEN": src.api_token,
332 "PATH": os.environ["PATH"]}
334 result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
335 capture_output=True, env=env)
336 except FileNotFoundError:
339 no_arv_copy = result.returncode == 2
342 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
343 elif result.returncode != 0:
344 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
346 locations = json.loads(result.stdout)
349 copy_collections(locations, src, dst, args)
351 # copy the workflow itself
353 wf['owner_uuid'] = args.project_uuid
355 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
356 ["name", "=", wf["name"]]]).execute()
357 if len(existing["items"]) == 0:
358 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
360 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
363 def workflow_collections(obj, locations, docker_images):
364 if isinstance(obj, dict):
365 loc = obj.get('location', None)
367 if loc.startswith("keep:"):
368 locations.append(loc[5:])
370 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
371 if docker_image is not None:
372 ds = docker_image.split(":", 1)
373 tag = ds[1] if len(ds)==2 else 'latest'
374 docker_images[ds[0]] = tag
377 workflow_collections(obj[x], locations, docker_images)
378 elif isinstance(obj, list):
380 workflow_collections(x, locations, docker_images)
382 # copy_collections(obj, src, dst, args)
384 # Recursively copies all collections referenced by 'obj' from src
385 # to dst. obj may be a dict or a list, in which case we run
386 # copy_collections on every value it contains. If it is a string,
387 # search it for any substring that matches a collection hash or uuid
388 # (this will find hidden references to collections like
389 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
391 # Returns a copy of obj with any old collection uuids replaced by
394 def copy_collections(obj, src, dst, args):
396 def copy_collection_fn(collection_match):
397 """Helper function for regex substitution: copies a single collection,
398 identified by the collection_match MatchObject, to the
399 destination. Returns the destination collection uuid (or the
400 portable data hash if that's what src_id is).
403 src_id = collection_match.group(0)
404 if src_id not in collections_copied:
405 dst_col = copy_collection(src_id, src, dst, args)
406 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
407 collections_copied[src_id] = src_id
409 collections_copied[src_id] = dst_col['uuid']
410 return collections_copied[src_id]
412 if isinstance(obj, basestring):
413 # Copy any collections identified in this string to dst, replacing
414 # them with the dst uuids as necessary.
415 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
416 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
418 elif isinstance(obj, dict):
419 return type(obj)((v, copy_collections(obj[v], src, dst, args))
421 elif isinstance(obj, list):
422 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
426 def total_collection_size(manifest_text):
427 """Return the total number of bytes in this collection (excluding
428 duplicate blocks)."""
432 for line in manifest_text.splitlines():
434 for word in words[1:]:
436 loc = arvados.KeepLocator(word)
438 continue # this word isn't a locator, skip it
439 if loc.md5sum not in locators_seen:
440 locators_seen[loc.md5sum] = True
441 total_bytes += loc.size
445 def create_collection_from(c, src, dst, args):
446 """Create a new collection record on dst, and copy Docker metadata if
449 collection_uuid = c['uuid']
451 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
455 body['name'] = "copied from " + collection_uuid
457 if args.storage_classes:
458 body['storage_classes_desired'] = args.storage_classes
460 body['owner_uuid'] = args.project_uuid
462 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
464 # Create docker_image_repo+tag and docker_image_hash links
465 # at the destination.
466 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
467 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
469 for src_link in docker_links:
470 body = {key: src_link[key]
471 for key in ['link_class', 'name', 'properties']}
472 body['head_uuid'] = dst_collection['uuid']
473 body['owner_uuid'] = args.project_uuid
475 lk = dst.links().create(body=body).execute(num_retries=args.retries)
476 logger.debug('created dst link {}'.format(lk))
478 return dst_collection
480 # copy_collection(obj_uuid, src, dst, args)
482 # Copies the collection identified by obj_uuid from src to dst.
483 # Returns the collection object created at dst.
485 # If args.progress is True, produce a human-friendly progress
488 # If a collection with the desired portable_data_hash already
489 # exists at dst, and args.force is False, copy_collection returns
490 # the existing collection without copying any blocks. Otherwise
491 # (if no collection exists or if args.force is True)
492 # copy_collection copies all of the collection data blocks from src
495 # For this application, it is critical to preserve the
496 # collection's manifest hash, which is not guaranteed with the
497 # arvados.CollectionReader and arvados.CollectionWriter classes.
498 # Copying each block in the collection manually, followed by
499 # the manifest block, ensures that the collection's manifest
500 # hash will not change.
502 def copy_collection(obj_uuid, src, dst, args):
503 if arvados.util.keep_locator_pattern.match(obj_uuid):
504 # If the obj_uuid is a portable data hash, it might not be
505 # uniquely identified with a particular collection. As a
506 # result, it is ambiguous as to what name to use for the copy.
507 # Apply some heuristics to pick which collection to get the
509 srccol = src.collections().list(
510 filters=[['portable_data_hash', '=', obj_uuid]],
511 order="created_at asc"
512 ).execute(num_retries=args.retries)
514 items = srccol.get("items")
517 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
523 # There's only one collection with the PDH, so use that.
526 # See if there is a collection that's in the same project
527 # as the root item (usually a workflow) being copied.
529 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
533 # Didn't find any collections located in the same project, so
534 # pick the oldest collection that has a name assigned to it.
540 # None of the collections have names (?!), so just pick the
544 # list() doesn't return manifest text (and we don't want it to,
545 # because we don't need the same maninfest text sent to us 50
546 # times) so go and retrieve the collection object directly
547 # which will include the manifest text.
548 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
550 # Assume this is an actual collection uuid, so fetch it directly.
551 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
553 # If a collection with this hash already exists at the
554 # destination, and 'force' is not true, just return that
557 if 'portable_data_hash' in c:
558 colhash = c['portable_data_hash']
561 dstcol = dst.collections().list(
562 filters=[['portable_data_hash', '=', colhash]]
563 ).execute(num_retries=args.retries)
564 if dstcol['items_available'] > 0:
565 for d in dstcol['items']:
566 if ((args.project_uuid == d['owner_uuid']) and
567 (c.get('name') == d['name']) and
568 (c['portable_data_hash'] == d['portable_data_hash'])):
570 c['manifest_text'] = dst.collections().get(
571 uuid=dstcol['items'][0]['uuid']
572 ).execute(num_retries=args.retries)['manifest_text']
573 return create_collection_from(c, src, dst, args)
575 # Fetch the collection's manifest.
576 manifest = c['manifest_text']
577 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
579 # Copy each block from src_keep to dst_keep.
580 # Use the newly signed locators returned from dst_keep to build
581 # a new manifest as we go.
582 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
583 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
584 dst_manifest = io.StringIO()
587 bytes_expected = total_collection_size(manifest)
589 progress_writer = ProgressWriter(human_progress)
591 progress_writer = None
593 # go through the words
594 # put each block loc into 'get' queue
595 # 'get' threads get block and put it into 'put' queue
596 # 'put' threads put block and then update dst_locators
598 # after going through the whole manifest we go back through it
599 # again and build dst_manifest
601 lock = threading.Lock()
603 # the get queue should be unbounded because we'll add all the
604 # block hashes we want to get, but these are small
605 get_queue = queue.Queue()
609 # the put queue contains full data blocks
610 # and if 'get' is faster than 'put' we could end up consuming
611 # a great deal of RAM if it isn't bounded.
612 put_queue = queue.Queue(threadcount)
617 word = get_queue.get()
620 get_queue.task_done()
623 blockhash = arvados.KeepLocator(word).md5sum
625 if blockhash in dst_locators:
627 get_queue.task_done()
631 logger.debug("Getting block %s", word)
632 data = src_keep.get(word)
633 put_queue.put((word, data))
635 logger.error("Error getting block %s: %s", word, e)
636 transfer_error.append(e)
638 # Drain the 'get' queue so we end early
641 get_queue.task_done()
645 get_queue.task_done()
648 nonlocal bytes_written
650 item = put_queue.get()
652 put_queue.task_done()
656 loc = arvados.KeepLocator(word)
657 blockhash = loc.md5sum
659 if blockhash in dst_locators:
661 put_queue.task_done()
665 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
666 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
668 dst_locators[blockhash] = dst_locator
669 bytes_written += loc.size
671 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
673 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
675 # Drain the 'get' queue so we end early
678 get_queue.task_done()
681 transfer_error.append(e)
683 put_queue.task_done()
685 for line in manifest.splitlines():
687 for word in words[1:]:
689 loc = arvados.KeepLocator(word)
691 # If 'word' can't be parsed as a locator,
692 # presume it's a filename.
697 for i in range(0, threadcount):
700 for i in range(0, threadcount):
701 threading.Thread(target=get_thread, daemon=True).start()
703 for i in range(0, threadcount):
704 threading.Thread(target=put_thread, daemon=True).start()
709 if len(transfer_error) > 0:
710 return {"error_token": "Failed to transfer blocks"}
712 for line in manifest.splitlines():
714 dst_manifest.write(words[0])
715 for word in words[1:]:
717 loc = arvados.KeepLocator(word)
719 # If 'word' can't be parsed as a locator,
720 # presume it's a filename.
721 dst_manifest.write(' ')
722 dst_manifest.write(word)
724 blockhash = loc.md5sum
725 dst_manifest.write(' ')
726 dst_manifest.write(dst_locators[blockhash])
727 dst_manifest.write("\n")
730 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
731 progress_writer.finish()
733 # Copy the manifest and save the collection.
734 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
736 c['manifest_text'] = dst_manifest.getvalue()
737 return create_collection_from(c, src, dst, args)
739 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
740 r = api.repositories().list(
741 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
742 if r['items_available'] != 1:
743 raise Exception('cannot identify repo {}; {} repos found'
744 .format(repo_name, r['items_available']))
746 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
747 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
748 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
750 priority = https_url + other_url + http_url
753 if url.startswith("http"):
754 u = urllib.parse.urlsplit(url)
755 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
756 git_config = ["-c", "credential.%s/.username=none" % baseurl,
757 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
762 logger.debug("trying %s", url)
764 ['git', *git_config, 'ls-remote', url],
767 'ARVADOS_API_TOKEN': api.api_token,
768 'GIT_ASKPASS': '/bin/false',
769 'HOME': os.environ['HOME'],
771 stdout=subprocess.DEVNULL,
773 except subprocess.CalledProcessError:
779 raise Exception('Cannot access git repository, tried {}'
782 if git_url.startswith("http:"):
783 if allow_insecure_http:
784 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
786 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
788 return (git_url, git_config)
791 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
792 """Copy the docker image identified by docker_image and
793 docker_image_tag from src to dst. Create appropriate
794 docker_image_repo+tag and docker_image_hash links at dst.
798 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
800 # Find the link identifying this docker image.
801 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
802 src, args.retries, docker_image, docker_image_tag)
803 if docker_image_list:
804 image_uuid, image_info = docker_image_list[0]
805 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
807 # Copy the collection it refers to.
808 dst_image_col = copy_collection(image_uuid, src, dst, args)
809 elif arvados.util.keep_locator_pattern.match(docker_image):
810 dst_image_col = copy_collection(docker_image, src, dst, args)
812 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
814 def copy_project(obj_uuid, src, dst, owner_uuid, args):
816 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
818 # Create/update the destination project
819 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
820 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
821 if len(existing["items"]) == 0:
822 project_record = dst.groups().create(body={"group": {"group_class": "project",
823 "owner_uuid": owner_uuid,
824 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
826 project_record = existing["items"][0]
828 dst.groups().update(uuid=project_record["uuid"],
830 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
832 args.project_uuid = project_record["uuid"]
834 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
841 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
843 except Exception as e:
844 partial_error += "\n" + str(e)
847 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
849 copy_workflow(w["uuid"], src, dst, args)
850 except Exception as e:
851 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
854 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
856 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
857 except Exception as e:
858 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
860 project_record["partial_error"] = partial_error
862 return project_record
864 # git_rev_parse(rev, repo)
866 # Returns the 40-character commit hash corresponding to 'rev' in
867 # git repository 'repo' (which must be the path of a local git
870 def git_rev_parse(rev, repo):
871 proc = subprocess.run(
872 ['git', 'rev-parse', rev],
875 stdout=subprocess.PIPE,
878 return proc.stdout.read().strip()
880 # uuid_type(api, object_uuid)
882 # Returns the name of the class that object_uuid belongs to, based on
883 # the second field of the uuid. This function consults the api's
884 # schema to identify the object class.
886 # It returns a string such as 'Collection', 'Workflow', etc.
888 # Special case: if handed a Keep locator hash, return 'Collection'.
890 def uuid_type(api, object_uuid):
891 if re.match(arvados.util.keep_locator_pattern, object_uuid):
894 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
897 p = object_uuid.split('-')
900 for k in api._schema.schemas:
901 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
902 if type_prefix == obj_class:
907 def copy_from_http(url, src, dst, args):
909 project_uuid = args.project_uuid
910 varying_url_params = args.varying_url_params
911 prefer_cached_downloads = args.prefer_cached_downloads
913 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
914 varying_url_params=varying_url_params,
915 prefer_cached_downloads=prefer_cached_downloads)
916 if cached[2] is not None:
917 return copy_collection(cached[2], src, dst, args)
919 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
920 varying_url_params=varying_url_params,
921 prefer_cached_downloads=prefer_cached_downloads)
923 if cached is not None:
924 return {"uuid": cached[2]}
927 def abort(msg, code=1):
928 logger.info("arv-copy: %s", msg)
932 # Code for reporting on the progress of a collection upload.
933 # Stolen from arvados.commands.put.ArvPutCollectionWriter
934 # TODO(twp): figure out how to refactor into a shared library
935 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
938 def machine_progress(obj_uuid, bytes_written, bytes_expected):
939 return "{} {}: {} {} written {} total\n".format(
944 -1 if (bytes_expected is None) else bytes_expected)
946 def human_progress(obj_uuid, bytes_written, bytes_expected):
948 return "\r{}: {}M / {}M {:.1%} ".format(
950 bytes_written >> 20, bytes_expected >> 20,
951 float(bytes_written) / bytes_expected)
953 return "\r{}: {} ".format(obj_uuid, bytes_written)
955 class ProgressWriter(object):
956 _progress_func = None
959 def __init__(self, progress_func):
960 self._progress_func = progress_func
962 def report(self, obj_uuid, bytes_written, bytes_expected):
963 if self._progress_func is not None:
965 self._progress_func(obj_uuid, bytes_written, bytes_expected))
968 self.outfile.write("\n")
970 if __name__ == '__main__':