1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
47 import arvados.commands._util as arv_cmd
48 import arvados.commands.keepdocker
49 import arvados.http_to_keep
50 import ruamel.yaml as yaml
52 from arvados.api import OrderedJsonModel
53 from arvados._version import __version__
55 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
57 logger = logging.getLogger('arvados.arv-copy')
59 # local_repo_dir records which git repositories from the Arvados source
60 # instance have been checked out locally during this run, and to which
62 # e.g. if repository 'twp' from src_arv has been cloned into
63 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
67 # List of collections that have been copied in this session, and their
68 # destination collection UUIDs.
69 collections_copied = {}
71 # Set of (repository, script_version) two-tuples of commits copied in git.
72 scripts_copied = set()
74 # The owner_uuid of the object being copied
78 copy_opts = argparse.ArgumentParser(add_help=False)
80 copy_opts.add_argument(
81 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
82 help='Print version and exit.')
83 copy_opts.add_argument(
84 '-v', '--verbose', dest='verbose', action='store_true',
85 help='Verbose output.')
86 copy_opts.add_argument(
87 '--progress', dest='progress', action='store_true',
88 help='Report progress on copying collections. (default)')
89 copy_opts.add_argument(
90 '--no-progress', dest='progress', action='store_false',
91 help='Do not report progress on copying collections.')
92 copy_opts.add_argument(
93 '-f', '--force', dest='force', action='store_true',
94 help='Perform copy even if the object appears to exist at the remote destination.')
95 copy_opts.add_argument(
96 '--src', dest='source_arvados',
97 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
98 copy_opts.add_argument(
99 '--dst', dest='destination_arvados',
100 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
101 copy_opts.add_argument(
102 '--recursive', dest='recursive', action='store_true',
103 help='Recursively copy any dependencies for this object, and subprojects. (default)')
104 copy_opts.add_argument(
105 '--no-recursive', dest='recursive', action='store_false',
106 help='Do not copy any dependencies or subprojects.')
107 copy_opts.add_argument(
108 '--project-uuid', dest='project_uuid',
109 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
110 copy_opts.add_argument(
111 '--storage-classes', dest='storage_classes',
112 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
113 copy_opts.add_argument("--varying-url-params", type=str, default="",
114 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
116 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
117 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
119 copy_opts.add_argument(
121 help='The UUID of the object to be copied.')
122 copy_opts.set_defaults(progress=True)
123 copy_opts.set_defaults(recursive=True)
125 parser = argparse.ArgumentParser(
126 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
127 parents=[copy_opts, arv_cmd.retry_opt])
128 args = parser.parse_args()
130 if args.storage_classes:
131 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
134 logger.setLevel(logging.DEBUG)
136 logger.setLevel(logging.INFO)
138 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
139 args.source_arvados = args.object_uuid[:5]
141 # Create API clients for the source and destination instances
142 src_arv = api_for_instance(args.source_arvados, args.retries)
143 dst_arv = api_for_instance(args.destination_arvados, args.retries)
145 if not args.project_uuid:
146 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
148 # Identify the kind of object we have been given, and begin copying.
149 t = uuid_type(src_arv, args.object_uuid)
152 if t == 'Collection':
153 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
154 result = copy_collection(args.object_uuid,
157 elif t == 'Workflow':
158 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
159 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
161 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
162 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
164 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
166 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
167 except Exception as e:
168 logger.error("%s", e, exc_info=args.verbose)
171 # Clean up any outstanding temp git repositories.
172 for d in listvalues(local_repo_dir):
173 shutil.rmtree(d, ignore_errors=True)
175 # If no exception was thrown and the response does not have an
176 # error_token field, presume success
177 if result is None or 'error_token' in result or 'uuid' not in result:
179 logger.error("API server returned an error result: {}".format(result))
182 print(result['uuid'])
184 if result.get('partial_error'):
185 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
188 logger.info("Success: created copy with uuid {}".format(result['uuid']))
191 def set_src_owner_uuid(resource, uuid, args):
192 global src_owner_uuid
193 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
194 src_owner_uuid = c.get("owner_uuid")
196 # api_for_instance(instance_name)
198 # Creates an API client for the Arvados instance identified by
201 # If instance_name contains a slash, it is presumed to be a path
202 # (either local or absolute) to a file with Arvados configuration
205 # Otherwise, it is presumed to be the name of a file in
206 # $HOME/.config/arvados/instance_name.conf
208 def api_for_instance(instance_name, num_retries):
209 if not instance_name:
211 return arvados.api('v1', model=OrderedJsonModel())
213 if '/' in instance_name:
214 config_file = instance_name
216 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
219 cfg = arvados.config.load(config_file)
220 except (IOError, OSError) as e:
221 abort(("Could not open config file {}: {}\n" +
222 "You must make sure that your configuration tokens\n" +
223 "for Arvados instance {} are in {} and that this\n" +
224 "file is readable.").format(
225 config_file, e, instance_name, config_file))
227 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
229 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
230 ['1', 't', 'true', 'y', 'yes']))
231 client = arvados.api('v1',
232 host=cfg['ARVADOS_API_HOST'],
233 token=cfg['ARVADOS_API_TOKEN'],
234 insecure=api_is_insecure,
235 model=OrderedJsonModel(),
236 num_retries=num_retries,
239 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
242 # Check if git is available
243 def check_git_availability():
246 ['git', '--version'],
248 stdout=subprocess.DEVNULL,
250 except FileNotFoundError:
251 abort('git command is not available. Please ensure git is installed.')
254 def filter_iter(arg):
255 """Iterate a filter string-or-list.
257 Pass in a filter field that can either be a string or list.
258 This will iterate elements as if the field had been written as a list.
260 if isinstance(arg, basestring):
265 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
266 """Update a single repository filter in-place for the destination.
268 If the filter checks that the repository is src_repository, it is
269 updated to check that the repository is dst_repository. If it does
270 anything else, this function raises ValueError.
272 if src_repository is None:
273 raise ValueError("component does not specify a source repository")
274 elif dst_repository is None:
275 raise ValueError("no destination repository specified to update repository filter")
276 elif repo_filter[1:] == ['=', src_repository]:
277 repo_filter[2] = dst_repository
278 elif repo_filter[1:] == ['in', [src_repository]]:
279 repo_filter[2] = [dst_repository]
281 raise ValueError("repository filter is not a simple source match")
283 def migrate_script_version_filter(version_filter):
284 """Update a single script_version filter in-place for the destination.
286 Currently this function checks that all the filter operands are Git
287 commit hashes. If they're not, it raises ValueError to indicate that
288 the filter is not portable. It could be extended to make other
289 transformations in the future.
291 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
292 raise ValueError("script_version filter is not limited to commit hashes")
294 def attr_filtered(filter_, *attr_names):
295 """Return True if filter_ applies to any of attr_names, else False."""
296 return any((name == 'any') or (name in attr_names)
297 for name in filter_iter(filter_[0]))
299 @contextlib.contextmanager
300 def exception_handler(handler, *exc_types):
301 """If any exc_types are raised in the block, call handler on the exception."""
304 except exc_types as error:
308 # copy_workflow(wf_uuid, src, dst, args)
310 # Copies a workflow identified by wf_uuid from src to dst.
312 # If args.recursive is True, also copy any collections
313 # referenced in the workflow definition yaml.
315 # The owner_uuid of the new workflow is set to any given
316 # project_uuid or the user who copied the template.
318 # Returns the copied workflow object.
320 def copy_workflow(wf_uuid, src, dst, args):
321 # fetch the workflow from the source instance
322 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
324 if not wf["definition"]:
325 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
327 # copy collections and docker images
328 if args.recursive and wf["definition"]:
329 wf_def = yaml.safe_load(wf["definition"])
330 if wf_def is not None:
333 graph = wf_def.get('$graph', None)
334 if graph is not None:
335 workflow_collections(graph, locations, docker_images)
337 workflow_collections(wf_def, locations, docker_images)
340 copy_collections(locations, src, dst, args)
342 for image in docker_images:
343 copy_docker_image(image, docker_images[image], src, dst, args)
345 # copy the workflow itself
347 wf['owner_uuid'] = args.project_uuid
349 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
350 ["name", "=", wf["name"]]]).execute()
351 if len(existing["items"]) == 0:
352 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
354 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
357 def workflow_collections(obj, locations, docker_images):
358 if isinstance(obj, dict):
359 loc = obj.get('location', None)
361 if loc.startswith("keep:"):
362 locations.append(loc[5:])
364 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
365 if docker_image is not None:
366 ds = docker_image.split(":", 1)
367 tag = ds[1] if len(ds)==2 else 'latest'
368 docker_images[ds[0]] = tag
371 workflow_collections(obj[x], locations, docker_images)
372 elif isinstance(obj, list):
374 workflow_collections(x, locations, docker_images)
376 # copy_collections(obj, src, dst, args)
378 # Recursively copies all collections referenced by 'obj' from src
379 # to dst. obj may be a dict or a list, in which case we run
380 # copy_collections on every value it contains. If it is a string,
381 # search it for any substring that matches a collection hash or uuid
382 # (this will find hidden references to collections like
383 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
385 # Returns a copy of obj with any old collection uuids replaced by
388 def copy_collections(obj, src, dst, args):
390 def copy_collection_fn(collection_match):
391 """Helper function for regex substitution: copies a single collection,
392 identified by the collection_match MatchObject, to the
393 destination. Returns the destination collection uuid (or the
394 portable data hash if that's what src_id is).
397 src_id = collection_match.group(0)
398 if src_id not in collections_copied:
399 dst_col = copy_collection(src_id, src, dst, args)
400 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
401 collections_copied[src_id] = src_id
403 collections_copied[src_id] = dst_col['uuid']
404 return collections_copied[src_id]
406 if isinstance(obj, basestring):
407 # Copy any collections identified in this string to dst, replacing
408 # them with the dst uuids as necessary.
409 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
410 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
412 elif isinstance(obj, dict):
413 return type(obj)((v, copy_collections(obj[v], src, dst, args))
415 elif isinstance(obj, list):
416 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
420 def total_collection_size(manifest_text):
421 """Return the total number of bytes in this collection (excluding
422 duplicate blocks)."""
426 for line in manifest_text.splitlines():
428 for word in words[1:]:
430 loc = arvados.KeepLocator(word)
432 continue # this word isn't a locator, skip it
433 if loc.md5sum not in locators_seen:
434 locators_seen[loc.md5sum] = True
435 total_bytes += loc.size
439 def create_collection_from(c, src, dst, args):
440 """Create a new collection record on dst, and copy Docker metadata if
443 collection_uuid = c['uuid']
445 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
449 body['name'] = "copied from " + collection_uuid
451 if args.storage_classes:
452 body['storage_classes_desired'] = args.storage_classes
454 body['owner_uuid'] = args.project_uuid
456 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
458 # Create docker_image_repo+tag and docker_image_hash links
459 # at the destination.
460 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
461 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
463 for src_link in docker_links:
464 body = {key: src_link[key]
465 for key in ['link_class', 'name', 'properties']}
466 body['head_uuid'] = dst_collection['uuid']
467 body['owner_uuid'] = args.project_uuid
469 lk = dst.links().create(body=body).execute(num_retries=args.retries)
470 logger.debug('created dst link {}'.format(lk))
472 return dst_collection
474 # copy_collection(obj_uuid, src, dst, args)
476 # Copies the collection identified by obj_uuid from src to dst.
477 # Returns the collection object created at dst.
479 # If args.progress is True, produce a human-friendly progress
482 # If a collection with the desired portable_data_hash already
483 # exists at dst, and args.force is False, copy_collection returns
484 # the existing collection without copying any blocks. Otherwise
485 # (if no collection exists or if args.force is True)
486 # copy_collection copies all of the collection data blocks from src
489 # For this application, it is critical to preserve the
490 # collection's manifest hash, which is not guaranteed with the
491 # arvados.CollectionReader and arvados.CollectionWriter classes.
492 # Copying each block in the collection manually, followed by
493 # the manifest block, ensures that the collection's manifest
494 # hash will not change.
496 def copy_collection(obj_uuid, src, dst, args):
497 if arvados.util.keep_locator_pattern.match(obj_uuid):
498 # If the obj_uuid is a portable data hash, it might not be
499 # uniquely identified with a particular collection. As a
500 # result, it is ambiguous as to what name to use for the copy.
501 # Apply some heuristics to pick which collection to get the
503 srccol = src.collections().list(
504 filters=[['portable_data_hash', '=', obj_uuid]],
505 order="created_at asc"
506 ).execute(num_retries=args.retries)
508 items = srccol.get("items")
511 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
517 # There's only one collection with the PDH, so use that.
520 # See if there is a collection that's in the same project
521 # as the root item (usually a workflow) being copied.
523 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
527 # Didn't find any collections located in the same project, so
528 # pick the oldest collection that has a name assigned to it.
534 # None of the collections have names (?!), so just pick the
538 # list() doesn't return manifest text (and we don't want it to,
539 # because we don't need the same maninfest text sent to us 50
540 # times) so go and retrieve the collection object directly
541 # which will include the manifest text.
542 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
544 # Assume this is an actual collection uuid, so fetch it directly.
545 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
547 # If a collection with this hash already exists at the
548 # destination, and 'force' is not true, just return that
551 if 'portable_data_hash' in c:
552 colhash = c['portable_data_hash']
555 dstcol = dst.collections().list(
556 filters=[['portable_data_hash', '=', colhash]]
557 ).execute(num_retries=args.retries)
558 if dstcol['items_available'] > 0:
559 for d in dstcol['items']:
560 if ((args.project_uuid == d['owner_uuid']) and
561 (c.get('name') == d['name']) and
562 (c['portable_data_hash'] == d['portable_data_hash'])):
564 c['manifest_text'] = dst.collections().get(
565 uuid=dstcol['items'][0]['uuid']
566 ).execute(num_retries=args.retries)['manifest_text']
567 return create_collection_from(c, src, dst, args)
569 # Fetch the collection's manifest.
570 manifest = c['manifest_text']
571 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
573 # Copy each block from src_keep to dst_keep.
574 # Use the newly signed locators returned from dst_keep to build
575 # a new manifest as we go.
576 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
577 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
578 dst_manifest = io.StringIO()
581 bytes_expected = total_collection_size(manifest)
583 progress_writer = ProgressWriter(human_progress)
585 progress_writer = None
587 # go through the words
588 # put each block loc into 'get' queue
589 # 'get' threads get block and put it into 'put' queue
590 # 'put' threads put block and then update dst_locators
592 # after going through the whole manifest we go back through it
593 # again and build dst_manifest
595 lock = threading.Lock()
597 # the get queue should be unbounded because we'll add all the
598 # block hashes we want to get, but these are small
599 get_queue = queue.Queue()
603 # the put queue contains full data blocks
604 # and if 'get' is faster than 'put' we could end up consuming
605 # a great deal of RAM if it isn't bounded.
606 put_queue = queue.Queue(threadcount)
611 word = get_queue.get()
614 get_queue.task_done()
617 blockhash = arvados.KeepLocator(word).md5sum
619 if blockhash in dst_locators:
621 get_queue.task_done()
625 logger.debug("Getting block %s", word)
626 data = src_keep.get(word)
627 put_queue.put((word, data))
629 logger.error("Error getting block %s: %s", word, e)
630 transfer_error.append(e)
632 # Drain the 'get' queue so we end early
635 get_queue.task_done()
639 get_queue.task_done()
642 nonlocal bytes_written
644 item = put_queue.get()
646 put_queue.task_done()
650 loc = arvados.KeepLocator(word)
651 blockhash = loc.md5sum
653 if blockhash in dst_locators:
655 put_queue.task_done()
659 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
660 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
662 dst_locators[blockhash] = dst_locator
663 bytes_written += loc.size
665 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
667 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
669 # Drain the 'get' queue so we end early
672 get_queue.task_done()
675 transfer_error.append(e)
677 put_queue.task_done()
679 for line in manifest.splitlines():
681 for word in words[1:]:
683 loc = arvados.KeepLocator(word)
685 # If 'word' can't be parsed as a locator,
686 # presume it's a filename.
691 for i in range(0, threadcount):
694 for i in range(0, threadcount):
695 threading.Thread(target=get_thread, daemon=True).start()
697 for i in range(0, threadcount):
698 threading.Thread(target=put_thread, daemon=True).start()
703 if len(transfer_error) > 0:
704 return {"error_token": "Failed to transfer blocks"}
706 for line in manifest.splitlines():
708 dst_manifest.write(words[0])
709 for word in words[1:]:
711 loc = arvados.KeepLocator(word)
713 # If 'word' can't be parsed as a locator,
714 # presume it's a filename.
715 dst_manifest.write(' ')
716 dst_manifest.write(word)
718 blockhash = loc.md5sum
719 dst_manifest.write(' ')
720 dst_manifest.write(dst_locators[blockhash])
721 dst_manifest.write("\n")
724 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
725 progress_writer.finish()
727 # Copy the manifest and save the collection.
728 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
730 c['manifest_text'] = dst_manifest.getvalue()
731 return create_collection_from(c, src, dst, args)
733 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
734 r = api.repositories().list(
735 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
736 if r['items_available'] != 1:
737 raise Exception('cannot identify repo {}; {} repos found'
738 .format(repo_name, r['items_available']))
740 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
741 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
742 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
744 priority = https_url + other_url + http_url
747 if url.startswith("http"):
748 u = urllib.parse.urlsplit(url)
749 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
750 git_config = ["-c", "credential.%s/.username=none" % baseurl,
751 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
756 logger.debug("trying %s", url)
758 ['git', *git_config, 'ls-remote', url],
761 'ARVADOS_API_TOKEN': api.api_token,
762 'GIT_ASKPASS': '/bin/false',
763 'HOME': os.environ['HOME'],
765 stdout=subprocess.DEVNULL,
767 except subprocess.CalledProcessError:
773 raise Exception('Cannot access git repository, tried {}'
776 if git_url.startswith("http:"):
777 if allow_insecure_http:
778 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
780 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
782 return (git_url, git_config)
785 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
786 """Copy the docker image identified by docker_image and
787 docker_image_tag from src to dst. Create appropriate
788 docker_image_repo+tag and docker_image_hash links at dst.
792 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
794 # Find the link identifying this docker image.
795 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
796 src, args.retries, docker_image, docker_image_tag)
797 if docker_image_list:
798 image_uuid, image_info = docker_image_list[0]
799 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
801 # Copy the collection it refers to.
802 dst_image_col = copy_collection(image_uuid, src, dst, args)
803 elif arvados.util.keep_locator_pattern.match(docker_image):
804 dst_image_col = copy_collection(docker_image, src, dst, args)
806 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
808 def copy_project(obj_uuid, src, dst, owner_uuid, args):
810 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
812 # Create/update the destination project
813 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
814 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
815 if len(existing["items"]) == 0:
816 project_record = dst.groups().create(body={"group": {"group_class": "project",
817 "owner_uuid": owner_uuid,
818 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
820 project_record = existing["items"][0]
822 dst.groups().update(uuid=project_record["uuid"],
824 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
826 args.project_uuid = project_record["uuid"]
828 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
835 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
837 except Exception as e:
838 partial_error += "\n" + str(e)
841 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
843 copy_workflow(w["uuid"], src, dst, args)
844 except Exception as e:
845 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
848 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
850 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
851 except Exception as e:
852 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
854 project_record["partial_error"] = partial_error
856 return project_record
858 # git_rev_parse(rev, repo)
860 # Returns the 40-character commit hash corresponding to 'rev' in
861 # git repository 'repo' (which must be the path of a local git
864 def git_rev_parse(rev, repo):
865 proc = subprocess.run(
866 ['git', 'rev-parse', rev],
869 stdout=subprocess.PIPE,
872 return proc.stdout.read().strip()
874 # uuid_type(api, object_uuid)
876 # Returns the name of the class that object_uuid belongs to, based on
877 # the second field of the uuid. This function consults the api's
878 # schema to identify the object class.
880 # It returns a string such as 'Collection', 'Workflow', etc.
882 # Special case: if handed a Keep locator hash, return 'Collection'.
884 def uuid_type(api, object_uuid):
885 if re.match(arvados.util.keep_locator_pattern, object_uuid):
888 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
891 p = object_uuid.split('-')
894 for k in api._schema.schemas:
895 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
896 if type_prefix == obj_class:
901 def copy_from_http(url, src, dst, args):
903 project_uuid = args.project_uuid
904 varying_url_params = args.varying_url_params
905 prefer_cached_downloads = args.prefer_cached_downloads
907 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
908 varying_url_params=varying_url_params,
909 prefer_cached_downloads=prefer_cached_downloads)
910 if cached[2] is not None:
911 return copy_collection(cached[2], src, dst, args)
913 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
914 varying_url_params=varying_url_params,
915 prefer_cached_downloads=prefer_cached_downloads)
917 if cached is not None:
918 return {"uuid": cached[2]}
921 def abort(msg, code=1):
922 logger.info("arv-copy: %s", msg)
926 # Code for reporting on the progress of a collection upload.
927 # Stolen from arvados.commands.put.ArvPutCollectionWriter
928 # TODO(twp): figure out how to refactor into a shared library
929 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
932 def machine_progress(obj_uuid, bytes_written, bytes_expected):
933 return "{} {}: {} {} written {} total\n".format(
938 -1 if (bytes_expected is None) else bytes_expected)
940 def human_progress(obj_uuid, bytes_written, bytes_expected):
942 return "\r{}: {}M / {}M {:.1%} ".format(
944 bytes_written >> 20, bytes_expected >> 20,
945 float(bytes_written) / bytes_expected)
947 return "\r{}: {} ".format(obj_uuid, bytes_written)
949 class ProgressWriter(object):
950 _progress_func = None
953 def __init__(self, progress_func):
954 self._progress_func = progress_func
956 def report(self, obj_uuid, bytes_written, bytes_expected):
957 if self._progress_func is not None:
959 self._progress_func(obj_uuid, bytes_written, bytes_expected))
962 self.outfile.write("\n")
964 if __name__ == '__main__':