1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
52 from arvados._version import __version__
54 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
56 logger = logging.getLogger('arvados.arv-copy')
58 # local_repo_dir records which git repositories from the Arvados source
59 # instance have been checked out locally during this run, and to which
61 # e.g. if repository 'twp' from src_arv has been cloned into
62 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
66 # List of collections that have been copied in this session, and their
67 # destination collection UUIDs.
68 collections_copied = {}
70 # Set of (repository, script_version) two-tuples of commits copied in git.
71 scripts_copied = set()
73 # The owner_uuid of the object being copied
77 copy_opts = argparse.ArgumentParser(add_help=False)
79 copy_opts.add_argument(
80 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
81 help='Print version and exit.')
82 copy_opts.add_argument(
83 '-v', '--verbose', dest='verbose', action='store_true',
84 help='Verbose output.')
85 copy_opts.add_argument(
86 '--progress', dest='progress', action='store_true',
87 help='Report progress on copying collections. (default)')
88 copy_opts.add_argument(
89 '--no-progress', dest='progress', action='store_false',
90 help='Do not report progress on copying collections.')
91 copy_opts.add_argument(
92 '-f', '--force', dest='force', action='store_true',
93 help='Perform copy even if the object appears to exist at the remote destination.')
94 copy_opts.add_argument(
95 '--src', dest='source_arvados',
96 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
97 copy_opts.add_argument(
98 '--dst', dest='destination_arvados',
99 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
100 copy_opts.add_argument(
101 '--recursive', dest='recursive', action='store_true',
102 help='Recursively copy any dependencies for this object, and subprojects. (default)')
103 copy_opts.add_argument(
104 '--no-recursive', dest='recursive', action='store_false',
105 help='Do not copy any dependencies or subprojects.')
106 copy_opts.add_argument(
107 '--project-uuid', dest='project_uuid',
108 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109 copy_opts.add_argument(
110 '--storage-classes', dest='storage_classes',
111 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112 copy_opts.add_argument("--varying-url-params", type=str, default="",
113 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
115 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
118 copy_opts.add_argument(
120 help='The UUID of the object to be copied.')
121 copy_opts.set_defaults(progress=True)
122 copy_opts.set_defaults(recursive=True)
124 parser = argparse.ArgumentParser(
125 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
126 parents=[copy_opts, arv_cmd.retry_opt])
127 args = parser.parse_args()
129 if args.storage_classes:
130 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
133 logger.setLevel(logging.DEBUG)
135 logger.setLevel(logging.INFO)
137 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138 args.source_arvados = args.object_uuid[:5]
140 # Create API clients for the source and destination instances
141 src_arv = api_for_instance(args.source_arvados, args.retries)
142 dst_arv = api_for_instance(args.destination_arvados, args.retries)
144 if not args.project_uuid:
145 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
147 # Identify the kind of object we have been given, and begin copying.
148 t = uuid_type(src_arv, args.object_uuid)
149 if t == 'Collection':
150 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
151 result = copy_collection(args.object_uuid,
154 elif t == 'Workflow':
155 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
156 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
158 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
159 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
161 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
163 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
165 # Clean up any outstanding temp git repositories.
166 for d in listvalues(local_repo_dir):
167 shutil.rmtree(d, ignore_errors=True)
169 # If no exception was thrown and the response does not have an
170 # error_token field, presume success
171 if result is None or 'error_token' in result or 'uuid' not in result:
173 logger.error("API server returned an error result: {}".format(result))
176 print(result['uuid'])
178 if result.get('partial_error'):
179 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
182 logger.info("Success: created copy with uuid {}".format(result['uuid']))
185 def set_src_owner_uuid(resource, uuid, args):
186 global src_owner_uuid
187 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
188 src_owner_uuid = c.get("owner_uuid")
190 # api_for_instance(instance_name)
192 # Creates an API client for the Arvados instance identified by
195 # If instance_name contains a slash, it is presumed to be a path
196 # (either local or absolute) to a file with Arvados configuration
199 # Otherwise, it is presumed to be the name of a file in
200 # $HOME/.config/arvados/instance_name.conf
202 def api_for_instance(instance_name, num_retries):
203 if not instance_name:
205 return arvados.api('v1')
207 if '/' in instance_name:
208 config_file = instance_name
210 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
213 cfg = arvados.config.load(config_file)
214 except (IOError, OSError) as e:
215 abort(("Could not open config file {}: {}\n" +
216 "You must make sure that your configuration tokens\n" +
217 "for Arvados instance {} are in {} and that this\n" +
218 "file is readable.").format(
219 config_file, e, instance_name, config_file))
221 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
223 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
224 ['1', 't', 'true', 'y', 'yes']))
225 client = arvados.api('v1',
226 host=cfg['ARVADOS_API_HOST'],
227 token=cfg['ARVADOS_API_TOKEN'],
228 insecure=api_is_insecure,
229 num_retries=num_retries,
232 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
235 # Check if git is available
236 def check_git_availability():
239 ['git', '--version'],
241 stdout=subprocess.DEVNULL,
243 except FileNotFoundError:
244 abort('git command is not available. Please ensure git is installed.')
247 def filter_iter(arg):
248 """Iterate a filter string-or-list.
250 Pass in a filter field that can either be a string or list.
251 This will iterate elements as if the field had been written as a list.
253 if isinstance(arg, basestring):
258 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
259 """Update a single repository filter in-place for the destination.
261 If the filter checks that the repository is src_repository, it is
262 updated to check that the repository is dst_repository. If it does
263 anything else, this function raises ValueError.
265 if src_repository is None:
266 raise ValueError("component does not specify a source repository")
267 elif dst_repository is None:
268 raise ValueError("no destination repository specified to update repository filter")
269 elif repo_filter[1:] == ['=', src_repository]:
270 repo_filter[2] = dst_repository
271 elif repo_filter[1:] == ['in', [src_repository]]:
272 repo_filter[2] = [dst_repository]
274 raise ValueError("repository filter is not a simple source match")
276 def migrate_script_version_filter(version_filter):
277 """Update a single script_version filter in-place for the destination.
279 Currently this function checks that all the filter operands are Git
280 commit hashes. If they're not, it raises ValueError to indicate that
281 the filter is not portable. It could be extended to make other
282 transformations in the future.
284 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
285 raise ValueError("script_version filter is not limited to commit hashes")
287 def attr_filtered(filter_, *attr_names):
288 """Return True if filter_ applies to any of attr_names, else False."""
289 return any((name == 'any') or (name in attr_names)
290 for name in filter_iter(filter_[0]))
292 @contextlib.contextmanager
293 def exception_handler(handler, *exc_types):
294 """If any exc_types are raised in the block, call handler on the exception."""
297 except exc_types as error:
301 # copy_workflow(wf_uuid, src, dst, args)
303 # Copies a workflow identified by wf_uuid from src to dst.
305 # If args.recursive is True, also copy any collections
306 # referenced in the workflow definition yaml.
308 # The owner_uuid of the new workflow is set to any given
309 # project_uuid or the user who copied the template.
311 # Returns the copied workflow object.
313 def copy_workflow(wf_uuid, src, dst, args):
314 # fetch the workflow from the source instance
315 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
317 if not wf["definition"]:
318 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
320 # copy collections and docker images
321 if args.recursive and wf["definition"]:
322 wf_def = yaml.safe_load(wf["definition"])
323 if wf_def is not None:
326 graph = wf_def.get('$graph', None)
327 if graph is not None:
328 workflow_collections(graph, locations, docker_images)
330 workflow_collections(wf_def, locations, docker_images)
333 copy_collections(locations, src, dst, args)
335 for image in docker_images:
336 copy_docker_image(image, docker_images[image], src, dst, args)
338 # copy the workflow itself
340 wf['owner_uuid'] = args.project_uuid
342 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
343 ["name", "=", wf["name"]]]).execute()
344 if len(existing["items"]) == 0:
345 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
347 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
350 def workflow_collections(obj, locations, docker_images):
351 if isinstance(obj, dict):
352 loc = obj.get('location', None)
354 if loc.startswith("keep:"):
355 locations.append(loc[5:])
357 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
358 if docker_image is not None:
359 ds = docker_image.split(":", 1)
360 tag = ds[1] if len(ds)==2 else 'latest'
361 docker_images[ds[0]] = tag
364 workflow_collections(obj[x], locations, docker_images)
365 elif isinstance(obj, list):
367 workflow_collections(x, locations, docker_images)
369 # copy_collections(obj, src, dst, args)
371 # Recursively copies all collections referenced by 'obj' from src
372 # to dst. obj may be a dict or a list, in which case we run
373 # copy_collections on every value it contains. If it is a string,
374 # search it for any substring that matches a collection hash or uuid
375 # (this will find hidden references to collections like
376 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
378 # Returns a copy of obj with any old collection uuids replaced by
381 def copy_collections(obj, src, dst, args):
383 def copy_collection_fn(collection_match):
384 """Helper function for regex substitution: copies a single collection,
385 identified by the collection_match MatchObject, to the
386 destination. Returns the destination collection uuid (or the
387 portable data hash if that's what src_id is).
390 src_id = collection_match.group(0)
391 if src_id not in collections_copied:
392 dst_col = copy_collection(src_id, src, dst, args)
393 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
394 collections_copied[src_id] = src_id
396 collections_copied[src_id] = dst_col['uuid']
397 return collections_copied[src_id]
399 if isinstance(obj, basestring):
400 # Copy any collections identified in this string to dst, replacing
401 # them with the dst uuids as necessary.
402 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
403 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
405 elif isinstance(obj, dict):
406 return type(obj)((v, copy_collections(obj[v], src, dst, args))
408 elif isinstance(obj, list):
409 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
413 def total_collection_size(manifest_text):
414 """Return the total number of bytes in this collection (excluding
415 duplicate blocks)."""
419 for line in manifest_text.splitlines():
421 for word in words[1:]:
423 loc = arvados.KeepLocator(word)
425 continue # this word isn't a locator, skip it
426 if loc.md5sum not in locators_seen:
427 locators_seen[loc.md5sum] = True
428 total_bytes += loc.size
432 def create_collection_from(c, src, dst, args):
433 """Create a new collection record on dst, and copy Docker metadata if
436 collection_uuid = c['uuid']
438 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
442 body['name'] = "copied from " + collection_uuid
444 if args.storage_classes:
445 body['storage_classes_desired'] = args.storage_classes
447 body['owner_uuid'] = args.project_uuid
449 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
451 # Create docker_image_repo+tag and docker_image_hash links
452 # at the destination.
453 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
454 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
456 for src_link in docker_links:
457 body = {key: src_link[key]
458 for key in ['link_class', 'name', 'properties']}
459 body['head_uuid'] = dst_collection['uuid']
460 body['owner_uuid'] = args.project_uuid
462 lk = dst.links().create(body=body).execute(num_retries=args.retries)
463 logger.debug('created dst link {}'.format(lk))
465 return dst_collection
467 # copy_collection(obj_uuid, src, dst, args)
469 # Copies the collection identified by obj_uuid from src to dst.
470 # Returns the collection object created at dst.
472 # If args.progress is True, produce a human-friendly progress
475 # If a collection with the desired portable_data_hash already
476 # exists at dst, and args.force is False, copy_collection returns
477 # the existing collection without copying any blocks. Otherwise
478 # (if no collection exists or if args.force is True)
479 # copy_collection copies all of the collection data blocks from src
482 # For this application, it is critical to preserve the
483 # collection's manifest hash, which is not guaranteed with the
484 # arvados.CollectionReader and arvados.CollectionWriter classes.
485 # Copying each block in the collection manually, followed by
486 # the manifest block, ensures that the collection's manifest
487 # hash will not change.
489 def copy_collection(obj_uuid, src, dst, args):
490 if arvados.util.keep_locator_pattern.match(obj_uuid):
491 # If the obj_uuid is a portable data hash, it might not be
492 # uniquely identified with a particular collection. As a
493 # result, it is ambiguous as to what name to use for the copy.
494 # Apply some heuristics to pick which collection to get the
496 srccol = src.collections().list(
497 filters=[['portable_data_hash', '=', obj_uuid]],
498 order="created_at asc"
499 ).execute(num_retries=args.retries)
501 items = srccol.get("items")
504 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
510 # There's only one collection with the PDH, so use that.
513 # See if there is a collection that's in the same project
514 # as the root item (usually a workflow) being copied.
516 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
520 # Didn't find any collections located in the same project, so
521 # pick the oldest collection that has a name assigned to it.
527 # None of the collections have names (?!), so just pick the
531 # list() doesn't return manifest text (and we don't want it to,
532 # because we don't need the same maninfest text sent to us 50
533 # times) so go and retrieve the collection object directly
534 # which will include the manifest text.
535 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
537 # Assume this is an actual collection uuid, so fetch it directly.
538 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
540 # If a collection with this hash already exists at the
541 # destination, and 'force' is not true, just return that
544 if 'portable_data_hash' in c:
545 colhash = c['portable_data_hash']
548 dstcol = dst.collections().list(
549 filters=[['portable_data_hash', '=', colhash]]
550 ).execute(num_retries=args.retries)
551 if dstcol['items_available'] > 0:
552 for d in dstcol['items']:
553 if ((args.project_uuid == d['owner_uuid']) and
554 (c.get('name') == d['name']) and
555 (c['portable_data_hash'] == d['portable_data_hash'])):
557 c['manifest_text'] = dst.collections().get(
558 uuid=dstcol['items'][0]['uuid']
559 ).execute(num_retries=args.retries)['manifest_text']
560 return create_collection_from(c, src, dst, args)
562 # Fetch the collection's manifest.
563 manifest = c['manifest_text']
564 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
566 # Copy each block from src_keep to dst_keep.
567 # Use the newly signed locators returned from dst_keep to build
568 # a new manifest as we go.
569 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
570 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
571 dst_manifest = io.StringIO()
574 bytes_expected = total_collection_size(manifest)
576 progress_writer = ProgressWriter(human_progress)
578 progress_writer = None
580 # go through the words
581 # put each block loc into 'get' queue
582 # 'get' threads get block and put it into 'put' queue
583 # 'put' threads put block and then update dst_locators
585 # after going through the whole manifest we go back through it
586 # again and build dst_manifest
588 lock = threading.Lock()
589 get_queue = queue.Queue()
590 put_queue = queue.Queue()
595 word = get_queue.get()
598 get_queue.task_done()
601 blockhash = arvados.KeepLocator(word).md5sum
603 if blockhash in dst_locators:
607 logger.debug("Getting block %s", word)
608 data = src_keep.get(word)
609 put_queue.put((word, data))
611 logger.error("Error getting block %s: %s", word, e)
612 transfer_error.append(e)
614 # Drain the 'get' queue so we end early
620 get_queue.task_done()
624 item = put_queue.get()
626 put_queue.task_done()
630 loc = arvados.KeepLocator(word)
631 blockhash = loc.md5sum
633 if blockhash in dst_locators:
637 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
638 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
640 dst_locators[blockhash] = dst_locator
641 bytes_written[0] += loc.size
643 progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
645 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
647 # Drain the 'get' queue so we end early
652 transfer_error.append(e)
654 put_queue.task_done()
656 for line in manifest.splitlines():
658 for word in words[1:]:
660 loc = arvados.KeepLocator(word)
662 # If 'word' can't be parsed as a locator,
663 # presume it's a filename.
673 threading.Thread(target=get_thread, daemon=True).start()
674 threading.Thread(target=get_thread, daemon=True).start()
675 threading.Thread(target=get_thread, daemon=True).start()
676 threading.Thread(target=get_thread, daemon=True).start()
678 threading.Thread(target=put_thread, daemon=True).start()
679 threading.Thread(target=put_thread, daemon=True).start()
680 threading.Thread(target=put_thread, daemon=True).start()
681 threading.Thread(target=put_thread, daemon=True).start()
686 if len(transfer_error) > 0:
687 return {"error_token": "Failed to transfer blocks"}
689 for line in manifest.splitlines():
691 dst_manifest.write(words[0])
692 for word in words[1:]:
694 loc = arvados.KeepLocator(word)
696 # If 'word' can't be parsed as a locator,
697 # presume it's a filename.
698 dst_manifest.write(' ')
699 dst_manifest.write(word)
701 blockhash = loc.md5sum
702 dst_manifest.write(' ')
703 dst_manifest.write(dst_locators[blockhash])
704 dst_manifest.write("\n")
707 progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
708 progress_writer.finish()
710 # Copy the manifest and save the collection.
711 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
713 c['manifest_text'] = dst_manifest.getvalue()
714 return create_collection_from(c, src, dst, args)
716 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
717 r = api.repositories().list(
718 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
719 if r['items_available'] != 1:
720 raise Exception('cannot identify repo {}; {} repos found'
721 .format(repo_name, r['items_available']))
723 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
724 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
725 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
727 priority = https_url + other_url + http_url
730 if url.startswith("http"):
731 u = urllib.parse.urlsplit(url)
732 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
733 git_config = ["-c", "credential.%s/.username=none" % baseurl,
734 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
739 logger.debug("trying %s", url)
741 ['git', *git_config, 'ls-remote', url],
744 'ARVADOS_API_TOKEN': api.api_token,
745 'GIT_ASKPASS': '/bin/false',
746 'HOME': os.environ['HOME'],
748 stdout=subprocess.DEVNULL,
750 except subprocess.CalledProcessError:
756 raise Exception('Cannot access git repository, tried {}'
759 if git_url.startswith("http:"):
760 if allow_insecure_http:
761 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
763 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
765 return (git_url, git_config)
768 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
769 """Copy the docker image identified by docker_image and
770 docker_image_tag from src to dst. Create appropriate
771 docker_image_repo+tag and docker_image_hash links at dst.
775 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
777 # Find the link identifying this docker image.
778 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
779 src, args.retries, docker_image, docker_image_tag)
780 if docker_image_list:
781 image_uuid, image_info = docker_image_list[0]
782 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
784 # Copy the collection it refers to.
785 dst_image_col = copy_collection(image_uuid, src, dst, args)
786 elif arvados.util.keep_locator_pattern.match(docker_image):
787 dst_image_col = copy_collection(docker_image, src, dst, args)
789 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
791 def copy_project(obj_uuid, src, dst, owner_uuid, args):
793 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
795 # Create/update the destination project
796 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
797 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
798 if len(existing["items"]) == 0:
799 project_record = dst.groups().create(body={"group": {"group_class": "project",
800 "owner_uuid": owner_uuid,
801 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
803 project_record = existing["items"][0]
805 dst.groups().update(uuid=project_record["uuid"],
807 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
809 args.project_uuid = project_record["uuid"]
811 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
818 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
820 except Exception as e:
821 partial_error += "\n" + str(e)
824 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
826 copy_workflow(w["uuid"], src, dst, args)
827 except Exception as e:
828 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
831 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
833 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
834 except Exception as e:
835 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
837 project_record["partial_error"] = partial_error
839 return project_record
841 # git_rev_parse(rev, repo)
843 # Returns the 40-character commit hash corresponding to 'rev' in
844 # git repository 'repo' (which must be the path of a local git
847 def git_rev_parse(rev, repo):
848 proc = subprocess.run(
849 ['git', 'rev-parse', rev],
852 stdout=subprocess.PIPE,
855 return proc.stdout.read().strip()
857 # uuid_type(api, object_uuid)
859 # Returns the name of the class that object_uuid belongs to, based on
860 # the second field of the uuid. This function consults the api's
861 # schema to identify the object class.
863 # It returns a string such as 'Collection', 'Workflow', etc.
865 # Special case: if handed a Keep locator hash, return 'Collection'.
867 def uuid_type(api, object_uuid):
868 if re.match(arvados.util.keep_locator_pattern, object_uuid):
871 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
874 p = object_uuid.split('-')
877 for k in api._schema.schemas:
878 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
879 if type_prefix == obj_class:
884 def copy_from_http(url, src, dst, args):
886 project_uuid = args.project_uuid
887 varying_url_params = args.varying_url_params
888 prefer_cached_downloads = args.prefer_cached_downloads
890 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
891 varying_url_params=varying_url_params,
892 prefer_cached_downloads=prefer_cached_downloads)
893 if cached[2] is not None:
894 return copy_collection(cached[2], src, dst, args)
896 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
897 varying_url_params=varying_url_params,
898 prefer_cached_downloads=prefer_cached_downloads)
900 if cached is not None:
901 return {"uuid": cached[2]}
904 def abort(msg, code=1):
905 logger.info("arv-copy: %s", msg)
909 # Code for reporting on the progress of a collection upload.
910 # Stolen from arvados.commands.put.ArvPutCollectionWriter
911 # TODO(twp): figure out how to refactor into a shared library
912 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
915 def machine_progress(obj_uuid, bytes_written, bytes_expected):
916 return "{} {}: {} {} written {} total\n".format(
921 -1 if (bytes_expected is None) else bytes_expected)
923 def human_progress(obj_uuid, bytes_written, bytes_expected):
925 return "\r{}: {}M / {}M {:.1%} ".format(
927 bytes_written >> 20, bytes_expected >> 20,
928 float(bytes_written) / bytes_expected)
930 return "\r{}: {} ".format(obj_uuid, bytes_written)
932 class ProgressWriter(object):
933 _progress_func = None
936 def __init__(self, progress_func):
937 self._progress_func = progress_func
939 def report(self, obj_uuid, bytes_written, bytes_expected):
940 if self._progress_func is not None:
942 self._progress_func(obj_uuid, bytes_written, bytes_expected))
945 self.outfile.write("\n")
947 if __name__ == '__main__':