1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
44 import arvados.commands._util as arv_cmd
45 import arvados.commands.keepdocker
46 import ruamel.yaml as yaml
48 from arvados.api import OrderedJsonModel
49 from arvados._version import __version__
51 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
53 logger = logging.getLogger('arvados.arv-copy')
55 # local_repo_dir records which git repositories from the Arvados source
56 # instance have been checked out locally during this run, and to which
58 # e.g. if repository 'twp' from src_arv has been cloned into
59 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
63 # List of collections that have been copied in this session, and their
64 # destination collection UUIDs.
65 collections_copied = {}
67 # Set of (repository, script_version) two-tuples of commits copied in git.
68 scripts_copied = set()
70 # The owner_uuid of the object being copied
74 copy_opts = argparse.ArgumentParser(add_help=False)
76 copy_opts.add_argument(
77 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
78 help='Print version and exit.')
79 copy_opts.add_argument(
80 '-v', '--verbose', dest='verbose', action='store_true',
81 help='Verbose output.')
82 copy_opts.add_argument(
83 '--progress', dest='progress', action='store_true',
84 help='Report progress on copying collections. (default)')
85 copy_opts.add_argument(
86 '--no-progress', dest='progress', action='store_false',
87 help='Do not report progress on copying collections.')
88 copy_opts.add_argument(
89 '-f', '--force', dest='force', action='store_true',
90 help='Perform copy even if the object appears to exist at the remote destination.')
91 copy_opts.add_argument(
92 '--src', dest='source_arvados',
93 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
94 copy_opts.add_argument(
95 '--dst', dest='destination_arvados',
96 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
97 copy_opts.add_argument(
98 '--recursive', dest='recursive', action='store_true',
99 help='Recursively copy any dependencies for this object, and subprojects. (default)')
100 copy_opts.add_argument(
101 '--no-recursive', dest='recursive', action='store_false',
102 help='Do not copy any dependencies or subprojects.')
103 copy_opts.add_argument(
104 '--project-uuid', dest='project_uuid',
105 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
106 copy_opts.add_argument(
107 '--storage-classes', dest='storage_classes',
108 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
110 copy_opts.add_argument(
112 help='The UUID of the object to be copied.')
113 copy_opts.set_defaults(progress=True)
114 copy_opts.set_defaults(recursive=True)
116 parser = argparse.ArgumentParser(
117 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
118 parents=[copy_opts, arv_cmd.retry_opt])
119 args = parser.parse_args()
121 if args.storage_classes:
122 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
125 logger.setLevel(logging.DEBUG)
127 logger.setLevel(logging.INFO)
129 if not args.source_arvados:
130 args.source_arvados = args.object_uuid[:5]
132 # Create API clients for the source and destination instances
133 src_arv = api_for_instance(args.source_arvados, args.retries)
134 dst_arv = api_for_instance(args.destination_arvados, args.retries)
136 if not args.project_uuid:
137 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
139 # Identify the kind of object we have been given, and begin copying.
140 t = uuid_type(src_arv, args.object_uuid)
141 if t == 'Collection':
142 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
143 result = copy_collection(args.object_uuid,
146 elif t == 'Workflow':
147 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
148 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
150 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
151 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
153 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
155 # Clean up any outstanding temp git repositories.
156 for d in listvalues(local_repo_dir):
157 shutil.rmtree(d, ignore_errors=True)
159 # If no exception was thrown and the response does not have an
160 # error_token field, presume success
161 if 'error_token' in result or 'uuid' not in result:
162 logger.error("API server returned an error result: {}".format(result))
165 print(result['uuid'])
167 if result.get('partial_error'):
168 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
171 logger.info("Success: created copy with uuid {}".format(result['uuid']))
174 def set_src_owner_uuid(resource, uuid, args):
175 global src_owner_uuid
176 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
177 src_owner_uuid = c.get("owner_uuid")
179 # api_for_instance(instance_name)
181 # Creates an API client for the Arvados instance identified by
184 # If instance_name contains a slash, it is presumed to be a path
185 # (either local or absolute) to a file with Arvados configuration
188 # Otherwise, it is presumed to be the name of a file in
189 # $HOME/.config/arvados/instance_name.conf
191 def api_for_instance(instance_name, num_retries):
192 if not instance_name:
194 return arvados.api('v1', model=OrderedJsonModel())
196 if '/' in instance_name:
197 config_file = instance_name
199 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
202 cfg = arvados.config.load(config_file)
203 except (IOError, OSError) as e:
204 abort(("Could not open config file {}: {}\n" +
205 "You must make sure that your configuration tokens\n" +
206 "for Arvados instance {} are in {} and that this\n" +
207 "file is readable.").format(
208 config_file, e, instance_name, config_file))
210 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
212 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
213 ['1', 't', 'true', 'y', 'yes']))
214 client = arvados.api('v1',
215 host=cfg['ARVADOS_API_HOST'],
216 token=cfg['ARVADOS_API_TOKEN'],
217 insecure=api_is_insecure,
218 model=OrderedJsonModel(),
219 num_retries=num_retries,
222 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
225 # Check if git is available
226 def check_git_availability():
229 ['git', '--version'],
231 stdout=subprocess.DEVNULL,
233 except FileNotFoundError:
234 abort('git command is not available. Please ensure git is installed.')
237 def filter_iter(arg):
238 """Iterate a filter string-or-list.
240 Pass in a filter field that can either be a string or list.
241 This will iterate elements as if the field had been written as a list.
243 if isinstance(arg, basestring):
248 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
249 """Update a single repository filter in-place for the destination.
251 If the filter checks that the repository is src_repository, it is
252 updated to check that the repository is dst_repository. If it does
253 anything else, this function raises ValueError.
255 if src_repository is None:
256 raise ValueError("component does not specify a source repository")
257 elif dst_repository is None:
258 raise ValueError("no destination repository specified to update repository filter")
259 elif repo_filter[1:] == ['=', src_repository]:
260 repo_filter[2] = dst_repository
261 elif repo_filter[1:] == ['in', [src_repository]]:
262 repo_filter[2] = [dst_repository]
264 raise ValueError("repository filter is not a simple source match")
266 def migrate_script_version_filter(version_filter):
267 """Update a single script_version filter in-place for the destination.
269 Currently this function checks that all the filter operands are Git
270 commit hashes. If they're not, it raises ValueError to indicate that
271 the filter is not portable. It could be extended to make other
272 transformations in the future.
274 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
275 raise ValueError("script_version filter is not limited to commit hashes")
277 def attr_filtered(filter_, *attr_names):
278 """Return True if filter_ applies to any of attr_names, else False."""
279 return any((name == 'any') or (name in attr_names)
280 for name in filter_iter(filter_[0]))
282 @contextlib.contextmanager
283 def exception_handler(handler, *exc_types):
284 """If any exc_types are raised in the block, call handler on the exception."""
287 except exc_types as error:
291 # copy_workflow(wf_uuid, src, dst, args)
293 # Copies a workflow identified by wf_uuid from src to dst.
295 # If args.recursive is True, also copy any collections
296 # referenced in the workflow definition yaml.
298 # The owner_uuid of the new workflow is set to any given
299 # project_uuid or the user who copied the template.
301 # Returns the copied workflow object.
303 def copy_workflow(wf_uuid, src, dst, args):
304 # fetch the workflow from the source instance
305 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
307 if not wf["definition"]:
308 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
310 # copy collections and docker images
311 if args.recursive and wf["definition"]:
312 wf_def = yaml.safe_load(wf["definition"])
313 if wf_def is not None:
316 graph = wf_def.get('$graph', None)
317 if graph is not None:
318 workflow_collections(graph, locations, docker_images)
320 workflow_collections(wf_def, locations, docker_images)
323 copy_collections(locations, src, dst, args)
325 for image in docker_images:
326 copy_docker_image(image, docker_images[image], src, dst, args)
328 # copy the workflow itself
330 wf['owner_uuid'] = args.project_uuid
332 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
333 ["name", "=", wf["name"]]]).execute()
334 if len(existing["items"]) == 0:
335 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
337 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
340 def workflow_collections(obj, locations, docker_images):
341 if isinstance(obj, dict):
342 loc = obj.get('location', None)
344 if loc.startswith("keep:"):
345 locations.append(loc[5:])
347 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
348 if docker_image is not None:
349 ds = docker_image.split(":", 1)
350 tag = ds[1] if len(ds)==2 else 'latest'
351 docker_images[ds[0]] = tag
354 workflow_collections(obj[x], locations, docker_images)
355 elif isinstance(obj, list):
357 workflow_collections(x, locations, docker_images)
359 # copy_collections(obj, src, dst, args)
361 # Recursively copies all collections referenced by 'obj' from src
362 # to dst. obj may be a dict or a list, in which case we run
363 # copy_collections on every value it contains. If it is a string,
364 # search it for any substring that matches a collection hash or uuid
365 # (this will find hidden references to collections like
366 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
368 # Returns a copy of obj with any old collection uuids replaced by
371 def copy_collections(obj, src, dst, args):
373 def copy_collection_fn(collection_match):
374 """Helper function for regex substitution: copies a single collection,
375 identified by the collection_match MatchObject, to the
376 destination. Returns the destination collection uuid (or the
377 portable data hash if that's what src_id is).
380 src_id = collection_match.group(0)
381 if src_id not in collections_copied:
382 dst_col = copy_collection(src_id, src, dst, args)
383 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
384 collections_copied[src_id] = src_id
386 collections_copied[src_id] = dst_col['uuid']
387 return collections_copied[src_id]
389 if isinstance(obj, basestring):
390 # Copy any collections identified in this string to dst, replacing
391 # them with the dst uuids as necessary.
392 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
393 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
395 elif isinstance(obj, dict):
396 return type(obj)((v, copy_collections(obj[v], src, dst, args))
398 elif isinstance(obj, list):
399 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
403 def total_collection_size(manifest_text):
404 """Return the total number of bytes in this collection (excluding
405 duplicate blocks)."""
409 for line in manifest_text.splitlines():
411 for word in words[1:]:
413 loc = arvados.KeepLocator(word)
415 continue # this word isn't a locator, skip it
416 if loc.md5sum not in locators_seen:
417 locators_seen[loc.md5sum] = True
418 total_bytes += loc.size
422 def create_collection_from(c, src, dst, args):
423 """Create a new collection record on dst, and copy Docker metadata if
426 collection_uuid = c['uuid']
428 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
432 body['name'] = "copied from " + collection_uuid
434 if args.storage_classes:
435 body['storage_classes_desired'] = args.storage_classes
437 body['owner_uuid'] = args.project_uuid
439 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
441 # Create docker_image_repo+tag and docker_image_hash links
442 # at the destination.
443 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
444 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
446 for src_link in docker_links:
447 body = {key: src_link[key]
448 for key in ['link_class', 'name', 'properties']}
449 body['head_uuid'] = dst_collection['uuid']
450 body['owner_uuid'] = args.project_uuid
452 lk = dst.links().create(body=body).execute(num_retries=args.retries)
453 logger.debug('created dst link {}'.format(lk))
455 return dst_collection
457 # copy_collection(obj_uuid, src, dst, args)
459 # Copies the collection identified by obj_uuid from src to dst.
460 # Returns the collection object created at dst.
462 # If args.progress is True, produce a human-friendly progress
465 # If a collection with the desired portable_data_hash already
466 # exists at dst, and args.force is False, copy_collection returns
467 # the existing collection without copying any blocks. Otherwise
468 # (if no collection exists or if args.force is True)
469 # copy_collection copies all of the collection data blocks from src
472 # For this application, it is critical to preserve the
473 # collection's manifest hash, which is not guaranteed with the
474 # arvados.CollectionReader and arvados.CollectionWriter classes.
475 # Copying each block in the collection manually, followed by
476 # the manifest block, ensures that the collection's manifest
477 # hash will not change.
479 def copy_collection(obj_uuid, src, dst, args):
480 if arvados.util.keep_locator_pattern.match(obj_uuid):
481 # If the obj_uuid is a portable data hash, it might not be
482 # uniquely identified with a particular collection. As a
483 # result, it is ambiguous as to what name to use for the copy.
484 # Apply some heuristics to pick which collection to get the
486 srccol = src.collections().list(
487 filters=[['portable_data_hash', '=', obj_uuid]],
488 order="created_at asc"
489 ).execute(num_retries=args.retries)
491 items = srccol.get("items")
494 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
500 # There's only one collection with the PDH, so use that.
503 # See if there is a collection that's in the same project
504 # as the root item (usually a workflow) being copied.
506 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
510 # Didn't find any collections located in the same project, so
511 # pick the oldest collection that has a name assigned to it.
517 # None of the collections have names (?!), so just pick the
521 # list() doesn't return manifest text (and we don't want it to,
522 # because we don't need the same maninfest text sent to us 50
523 # times) so go and retrieve the collection object directly
524 # which will include the manifest text.
525 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
527 # Assume this is an actual collection uuid, so fetch it directly.
528 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
530 # If a collection with this hash already exists at the
531 # destination, and 'force' is not true, just return that
534 if 'portable_data_hash' in c:
535 colhash = c['portable_data_hash']
538 dstcol = dst.collections().list(
539 filters=[['portable_data_hash', '=', colhash]]
540 ).execute(num_retries=args.retries)
541 if dstcol['items_available'] > 0:
542 for d in dstcol['items']:
543 if ((args.project_uuid == d['owner_uuid']) and
544 (c.get('name') == d['name']) and
545 (c['portable_data_hash'] == d['portable_data_hash'])):
547 c['manifest_text'] = dst.collections().get(
548 uuid=dstcol['items'][0]['uuid']
549 ).execute(num_retries=args.retries)['manifest_text']
550 return create_collection_from(c, src, dst, args)
552 # Fetch the collection's manifest.
553 manifest = c['manifest_text']
554 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
556 # Copy each block from src_keep to dst_keep.
557 # Use the newly signed locators returned from dst_keep to build
558 # a new manifest as we go.
559 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
560 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
561 dst_manifest = io.StringIO()
564 bytes_expected = total_collection_size(manifest)
566 progress_writer = ProgressWriter(human_progress)
568 progress_writer = None
570 for line in manifest.splitlines():
572 dst_manifest.write(words[0])
573 for word in words[1:]:
575 loc = arvados.KeepLocator(word)
577 # If 'word' can't be parsed as a locator,
578 # presume it's a filename.
579 dst_manifest.write(' ')
580 dst_manifest.write(word)
582 blockhash = loc.md5sum
583 # copy this block if we haven't seen it before
584 # (otherwise, just reuse the existing dst_locator)
585 if blockhash not in dst_locators:
586 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
588 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
589 data = src_keep.get(word)
590 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
591 dst_locators[blockhash] = dst_locator
592 bytes_written += loc.size
593 dst_manifest.write(' ')
594 dst_manifest.write(dst_locators[blockhash])
595 dst_manifest.write("\n")
598 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
599 progress_writer.finish()
601 # Copy the manifest and save the collection.
602 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
604 c['manifest_text'] = dst_manifest.getvalue()
605 return create_collection_from(c, src, dst, args)
607 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
608 r = api.repositories().list(
609 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
610 if r['items_available'] != 1:
611 raise Exception('cannot identify repo {}; {} repos found'
612 .format(repo_name, r['items_available']))
614 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
615 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
616 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
618 priority = https_url + other_url + http_url
621 if url.startswith("http"):
622 u = urllib.parse.urlsplit(url)
623 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
624 git_config = ["-c", "credential.%s/.username=none" % baseurl,
625 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
630 logger.debug("trying %s", url)
632 ['git', *git_config, 'ls-remote', url],
635 'ARVADOS_API_TOKEN': api.api_token,
636 'GIT_ASKPASS': '/bin/false',
637 'HOME': os.environ['HOME'],
639 stdout=subprocess.DEVNULL,
641 except subprocess.CalledProcessError:
647 raise Exception('Cannot access git repository, tried {}'
650 if git_url.startswith("http:"):
651 if allow_insecure_http:
652 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
654 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
656 return (git_url, git_config)
659 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
660 """Copy the docker image identified by docker_image and
661 docker_image_tag from src to dst. Create appropriate
662 docker_image_repo+tag and docker_image_hash links at dst.
666 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
668 # Find the link identifying this docker image.
669 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
670 src, args.retries, docker_image, docker_image_tag)
671 if docker_image_list:
672 image_uuid, image_info = docker_image_list[0]
673 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
675 # Copy the collection it refers to.
676 dst_image_col = copy_collection(image_uuid, src, dst, args)
677 elif arvados.util.keep_locator_pattern.match(docker_image):
678 dst_image_col = copy_collection(docker_image, src, dst, args)
680 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
682 def copy_project(obj_uuid, src, dst, owner_uuid, args):
684 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
686 # Create/update the destination project
687 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
688 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
689 if len(existing["items"]) == 0:
690 project_record = dst.groups().create(body={"group": {"group_class": "project",
691 "owner_uuid": owner_uuid,
692 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
694 project_record = existing["items"][0]
696 dst.groups().update(uuid=project_record["uuid"],
698 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
700 args.project_uuid = project_record["uuid"]
702 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
709 copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
711 except Exception as e:
712 partial_error += "\n" + str(e)
715 for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
717 copy_workflow(w["uuid"], src, dst, args)
718 except Exception as e:
719 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
722 for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
724 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
725 except Exception as e:
726 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
728 project_record["partial_error"] = partial_error
730 return project_record
732 # git_rev_parse(rev, repo)
734 # Returns the 40-character commit hash corresponding to 'rev' in
735 # git repository 'repo' (which must be the path of a local git
738 def git_rev_parse(rev, repo):
739 proc = subprocess.run(
740 ['git', 'rev-parse', rev],
743 stdout=subprocess.PIPE,
746 return proc.stdout.read().strip()
748 # uuid_type(api, object_uuid)
750 # Returns the name of the class that object_uuid belongs to, based on
751 # the second field of the uuid. This function consults the api's
752 # schema to identify the object class.
754 # It returns a string such as 'Collection', 'Workflow', etc.
756 # Special case: if handed a Keep locator hash, return 'Collection'.
758 def uuid_type(api, object_uuid):
759 if re.match(arvados.util.keep_locator_pattern, object_uuid):
761 p = object_uuid.split('-')
764 for k in api._schema.schemas:
765 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
766 if type_prefix == obj_class:
770 def abort(msg, code=1):
771 logger.info("arv-copy: %s", msg)
775 # Code for reporting on the progress of a collection upload.
776 # Stolen from arvados.commands.put.ArvPutCollectionWriter
777 # TODO(twp): figure out how to refactor into a shared library
778 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
781 def machine_progress(obj_uuid, bytes_written, bytes_expected):
782 return "{} {}: {} {} written {} total\n".format(
787 -1 if (bytes_expected is None) else bytes_expected)
789 def human_progress(obj_uuid, bytes_written, bytes_expected):
791 return "\r{}: {}M / {}M {:.1%} ".format(
793 bytes_written >> 20, bytes_expected >> 20,
794 float(bytes_written) / bytes_expected)
796 return "\r{}: {} ".format(obj_uuid, bytes_written)
798 class ProgressWriter(object):
799 _progress_func = None
802 def __init__(self, progress_func):
803 self._progress_func = progress_func
805 def report(self, obj_uuid, bytes_written, bytes_expected):
806 if self._progress_func is not None:
808 self._progress_func(obj_uuid, bytes_written, bytes_expected))
811 self.outfile.write("\n")
813 if __name__ == '__main__':