1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
43 import arvados.commands._util as arv_cmd
44 import arvados.commands.keepdocker
45 import ruamel.yaml as yaml
47 from arvados.api import OrderedJsonModel
48 from arvados._version import __version__
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
52 logger = logging.getLogger('arvados.arv-copy')
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
69 # The owner_uuid of the object being copied
73 copy_opts = argparse.ArgumentParser(add_help=False)
75 copy_opts.add_argument(
76 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77 help='Print version and exit.')
78 copy_opts.add_argument(
79 '-v', '--verbose', dest='verbose', action='store_true',
80 help='Verbose output.')
81 copy_opts.add_argument(
82 '--progress', dest='progress', action='store_true',
83 help='Report progress on copying collections. (default)')
84 copy_opts.add_argument(
85 '--no-progress', dest='progress', action='store_false',
86 help='Do not report progress on copying collections.')
87 copy_opts.add_argument(
88 '-f', '--force', dest='force', action='store_true',
89 help='Perform copy even if the object appears to exist at the remote destination.')
90 copy_opts.add_argument(
91 '--src', dest='source_arvados',
92 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93 copy_opts.add_argument(
94 '--dst', dest='destination_arvados',
95 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
96 copy_opts.add_argument(
97 '--recursive', dest='recursive', action='store_true',
98 help='Recursively copy any dependencies for this object, and subprojects. (default)')
99 copy_opts.add_argument(
100 '--no-recursive', dest='recursive', action='store_false',
101 help='Do not copy any dependencies or subprojects.')
102 copy_opts.add_argument(
103 '--project-uuid', dest='project_uuid',
104 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105 copy_opts.add_argument(
106 '--storage-classes', dest='storage_classes',
107 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
109 copy_opts.add_argument(
111 help='The UUID of the object to be copied.')
112 copy_opts.set_defaults(progress=True)
113 copy_opts.set_defaults(recursive=True)
115 parser = argparse.ArgumentParser(
116 description='Copy a workflow or collection from one Arvados instance to another.',
117 parents=[copy_opts, arv_cmd.retry_opt])
118 args = parser.parse_args()
120 if args.storage_classes:
121 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
124 logger.setLevel(logging.DEBUG)
126 logger.setLevel(logging.INFO)
128 if not args.source_arvados:
129 args.source_arvados = args.object_uuid[:5]
131 # Create API clients for the source and destination instances
132 src_arv = api_for_instance(args.source_arvados)
133 dst_arv = api_for_instance(args.destination_arvados)
135 if not args.project_uuid:
136 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
138 # Identify the kind of object we have been given, and begin copying.
139 t = uuid_type(src_arv, args.object_uuid)
140 if t == 'Collection':
141 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142 result = copy_collection(args.object_uuid,
145 elif t == 'Workflow':
146 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
147 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
149 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
150 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
152 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
154 # Clean up any outstanding temp git repositories.
155 for d in listvalues(local_repo_dir):
156 shutil.rmtree(d, ignore_errors=True)
158 # If no exception was thrown and the response does not have an
159 # error_token field, presume success
160 if 'error_token' in result or 'uuid' not in result:
161 logger.error("API server returned an error result: {}".format(result))
165 logger.info("Success: created copy with uuid {}".format(result['uuid']))
168 def set_src_owner_uuid(resource, uuid, args):
169 global src_owner_uuid
170 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
171 src_owner_uuid = c.get("owner_uuid")
173 # api_for_instance(instance_name)
175 # Creates an API client for the Arvados instance identified by
178 # If instance_name contains a slash, it is presumed to be a path
179 # (either local or absolute) to a file with Arvados configuration
182 # Otherwise, it is presumed to be the name of a file in
183 # $HOME/.config/arvados/instance_name.conf
185 def api_for_instance(instance_name):
186 if not instance_name:
188 return arvados.api('v1', model=OrderedJsonModel())
190 if '/' in instance_name:
191 config_file = instance_name
193 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
196 cfg = arvados.config.load(config_file)
197 except (IOError, OSError) as e:
198 abort(("Could not open config file {}: {}\n" +
199 "You must make sure that your configuration tokens\n" +
200 "for Arvados instance {} are in {} and that this\n" +
201 "file is readable.").format(
202 config_file, e, instance_name, config_file))
204 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
206 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
207 ['1', 't', 'true', 'y', 'yes']))
208 client = arvados.api('v1',
209 host=cfg['ARVADOS_API_HOST'],
210 token=cfg['ARVADOS_API_TOKEN'],
211 insecure=api_is_insecure,
212 model=OrderedJsonModel())
214 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
217 # Check if git is available
218 def check_git_availability():
220 arvados.util.run_command(['git', '--help'])
222 abort('git command is not available. Please ensure git is installed.')
225 def filter_iter(arg):
226 """Iterate a filter string-or-list.
228 Pass in a filter field that can either be a string or list.
229 This will iterate elements as if the field had been written as a list.
231 if isinstance(arg, basestring):
236 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
237 """Update a single repository filter in-place for the destination.
239 If the filter checks that the repository is src_repository, it is
240 updated to check that the repository is dst_repository. If it does
241 anything else, this function raises ValueError.
243 if src_repository is None:
244 raise ValueError("component does not specify a source repository")
245 elif dst_repository is None:
246 raise ValueError("no destination repository specified to update repository filter")
247 elif repo_filter[1:] == ['=', src_repository]:
248 repo_filter[2] = dst_repository
249 elif repo_filter[1:] == ['in', [src_repository]]:
250 repo_filter[2] = [dst_repository]
252 raise ValueError("repository filter is not a simple source match")
254 def migrate_script_version_filter(version_filter):
255 """Update a single script_version filter in-place for the destination.
257 Currently this function checks that all the filter operands are Git
258 commit hashes. If they're not, it raises ValueError to indicate that
259 the filter is not portable. It could be extended to make other
260 transformations in the future.
262 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
263 raise ValueError("script_version filter is not limited to commit hashes")
265 def attr_filtered(filter_, *attr_names):
266 """Return True if filter_ applies to any of attr_names, else False."""
267 return any((name == 'any') or (name in attr_names)
268 for name in filter_iter(filter_[0]))
270 @contextlib.contextmanager
271 def exception_handler(handler, *exc_types):
272 """If any exc_types are raised in the block, call handler on the exception."""
275 except exc_types as error:
279 # copy_workflow(wf_uuid, src, dst, args)
281 # Copies a workflow identified by wf_uuid from src to dst.
283 # If args.recursive is True, also copy any collections
284 # referenced in the workflow definition yaml.
286 # The owner_uuid of the new workflow is set to any given
287 # project_uuid or the user who copied the template.
289 # Returns the copied workflow object.
291 def copy_workflow(wf_uuid, src, dst, args):
292 # fetch the workflow from the source instance
293 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
295 # copy collections and docker images
297 wf_def = yaml.safe_load(wf["definition"])
298 if wf_def is not None:
301 graph = wf_def.get('$graph', None)
302 if graph is not None:
303 workflow_collections(graph, locations, docker_images)
305 workflow_collections(wf_def, locations, docker_images)
308 copy_collections(locations, src, dst, args)
310 for image in docker_images:
311 copy_docker_image(image, docker_images[image], src, dst, args)
313 # copy the workflow itself
315 wf['owner_uuid'] = args.project_uuid
317 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
318 ["name", "=", wf["name"]]]).execute()
319 if len(existing["items"]) == 0:
320 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
322 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
325 def workflow_collections(obj, locations, docker_images):
326 if isinstance(obj, dict):
327 loc = obj.get('location', None)
329 if loc.startswith("keep:"):
330 locations.append(loc[5:])
332 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
333 if docker_image is not None:
334 ds = docker_image.split(":", 1)
335 tag = ds[1] if len(ds)==2 else 'latest'
336 docker_images[ds[0]] = tag
339 workflow_collections(obj[x], locations, docker_images)
340 elif isinstance(obj, list):
342 workflow_collections(x, locations, docker_images)
344 # copy_collections(obj, src, dst, args)
346 # Recursively copies all collections referenced by 'obj' from src
347 # to dst. obj may be a dict or a list, in which case we run
348 # copy_collections on every value it contains. If it is a string,
349 # search it for any substring that matches a collection hash or uuid
350 # (this will find hidden references to collections like
351 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
353 # Returns a copy of obj with any old collection uuids replaced by
356 def copy_collections(obj, src, dst, args):
358 def copy_collection_fn(collection_match):
359 """Helper function for regex substitution: copies a single collection,
360 identified by the collection_match MatchObject, to the
361 destination. Returns the destination collection uuid (or the
362 portable data hash if that's what src_id is).
365 src_id = collection_match.group(0)
366 if src_id not in collections_copied:
367 dst_col = copy_collection(src_id, src, dst, args)
368 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
369 collections_copied[src_id] = src_id
371 collections_copied[src_id] = dst_col['uuid']
372 return collections_copied[src_id]
374 if isinstance(obj, basestring):
375 # Copy any collections identified in this string to dst, replacing
376 # them with the dst uuids as necessary.
377 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
378 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
380 elif isinstance(obj, dict):
381 return type(obj)((v, copy_collections(obj[v], src, dst, args))
383 elif isinstance(obj, list):
384 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
388 def total_collection_size(manifest_text):
389 """Return the total number of bytes in this collection (excluding
390 duplicate blocks)."""
394 for line in manifest_text.splitlines():
396 for word in words[1:]:
398 loc = arvados.KeepLocator(word)
400 continue # this word isn't a locator, skip it
401 if loc.md5sum not in locators_seen:
402 locators_seen[loc.md5sum] = True
403 total_bytes += loc.size
407 def create_collection_from(c, src, dst, args):
408 """Create a new collection record on dst, and copy Docker metadata if
411 collection_uuid = c['uuid']
413 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
417 body['name'] = "copied from " + collection_uuid
419 if args.storage_classes:
420 body['storage_classes_desired'] = args.storage_classes
422 body['owner_uuid'] = args.project_uuid
424 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
426 # Create docker_image_repo+tag and docker_image_hash links
427 # at the destination.
428 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
429 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
431 for src_link in docker_links:
432 body = {key: src_link[key]
433 for key in ['link_class', 'name', 'properties']}
434 body['head_uuid'] = dst_collection['uuid']
435 body['owner_uuid'] = args.project_uuid
437 lk = dst.links().create(body=body).execute(num_retries=args.retries)
438 logger.debug('created dst link {}'.format(lk))
440 return dst_collection
442 # copy_collection(obj_uuid, src, dst, args)
444 # Copies the collection identified by obj_uuid from src to dst.
445 # Returns the collection object created at dst.
447 # If args.progress is True, produce a human-friendly progress
450 # If a collection with the desired portable_data_hash already
451 # exists at dst, and args.force is False, copy_collection returns
452 # the existing collection without copying any blocks. Otherwise
453 # (if no collection exists or if args.force is True)
454 # copy_collection copies all of the collection data blocks from src
457 # For this application, it is critical to preserve the
458 # collection's manifest hash, which is not guaranteed with the
459 # arvados.CollectionReader and arvados.CollectionWriter classes.
460 # Copying each block in the collection manually, followed by
461 # the manifest block, ensures that the collection's manifest
462 # hash will not change.
464 def copy_collection(obj_uuid, src, dst, args):
465 if arvados.util.keep_locator_pattern.match(obj_uuid):
466 # If the obj_uuid is a portable data hash, it might not be
467 # uniquely identified with a particular collection. As a
468 # result, it is ambiguous as to what name to use for the copy.
469 # Apply some heuristics to pick which collection to get the
471 srccol = src.collections().list(
472 filters=[['portable_data_hash', '=', obj_uuid]],
473 order="created_at asc"
474 ).execute(num_retries=args.retries)
476 items = srccol.get("items")
479 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
485 # There's only one collection with the PDH, so use that.
488 # See if there is a collection that's in the same project
489 # as the root item (usually a workflow) being copied.
491 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
495 # Didn't find any collections located in the same project, so
496 # pick the oldest collection that has a name assigned to it.
502 # None of the collections have names (?!), so just pick the
506 # list() doesn't return manifest text (and we don't want it to,
507 # because we don't need the same maninfest text sent to us 50
508 # times) so go and retrieve the collection object directly
509 # which will include the manifest text.
510 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
512 # Assume this is an actual collection uuid, so fetch it directly.
513 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
515 # If a collection with this hash already exists at the
516 # destination, and 'force' is not true, just return that
519 if 'portable_data_hash' in c:
520 colhash = c['portable_data_hash']
523 dstcol = dst.collections().list(
524 filters=[['portable_data_hash', '=', colhash]]
525 ).execute(num_retries=args.retries)
526 if dstcol['items_available'] > 0:
527 for d in dstcol['items']:
528 if ((args.project_uuid == d['owner_uuid']) and
529 (c.get('name') == d['name']) and
530 (c['portable_data_hash'] == d['portable_data_hash'])):
532 c['manifest_text'] = dst.collections().get(
533 uuid=dstcol['items'][0]['uuid']
534 ).execute(num_retries=args.retries)['manifest_text']
535 return create_collection_from(c, src, dst, args)
537 # Fetch the collection's manifest.
538 manifest = c['manifest_text']
539 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
541 # Copy each block from src_keep to dst_keep.
542 # Use the newly signed locators returned from dst_keep to build
543 # a new manifest as we go.
544 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
545 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
546 dst_manifest = io.StringIO()
549 bytes_expected = total_collection_size(manifest)
551 progress_writer = ProgressWriter(human_progress)
553 progress_writer = None
555 for line in manifest.splitlines():
557 dst_manifest.write(words[0])
558 for word in words[1:]:
560 loc = arvados.KeepLocator(word)
562 # If 'word' can't be parsed as a locator,
563 # presume it's a filename.
564 dst_manifest.write(' ')
565 dst_manifest.write(word)
567 blockhash = loc.md5sum
568 # copy this block if we haven't seen it before
569 # (otherwise, just reuse the existing dst_locator)
570 if blockhash not in dst_locators:
571 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
573 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
574 data = src_keep.get(word)
575 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
576 dst_locators[blockhash] = dst_locator
577 bytes_written += loc.size
578 dst_manifest.write(' ')
579 dst_manifest.write(dst_locators[blockhash])
580 dst_manifest.write("\n")
583 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
584 progress_writer.finish()
586 # Copy the manifest and save the collection.
587 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
589 c['manifest_text'] = dst_manifest.getvalue()
590 return create_collection_from(c, src, dst, args)
592 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
593 r = api.repositories().list(
594 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
595 if r['items_available'] != 1:
596 raise Exception('cannot identify repo {}; {} repos found'
597 .format(repo_name, r['items_available']))
599 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
600 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
601 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
603 priority = https_url + other_url + http_url
608 if url.startswith("http"):
609 u = urllib.parse.urlsplit(url)
610 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
611 git_config = ["-c", "credential.%s/.username=none" % baseurl,
612 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
617 logger.debug("trying %s", url)
618 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
619 env={"HOME": os.environ["HOME"],
620 "ARVADOS_API_TOKEN": api.api_token,
621 "GIT_ASKPASS": "/bin/false"})
622 except arvados.errors.CommandFailedError:
629 raise Exception('Cannot access git repository, tried {}'
632 if git_url.startswith("http:"):
633 if allow_insecure_http:
634 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
636 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
638 return (git_url, git_config)
641 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
642 """Copy the docker image identified by docker_image and
643 docker_image_tag from src to dst. Create appropriate
644 docker_image_repo+tag and docker_image_hash links at dst.
648 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
650 # Find the link identifying this docker image.
651 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
652 src, args.retries, docker_image, docker_image_tag)
653 if docker_image_list:
654 image_uuid, image_info = docker_image_list[0]
655 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
657 # Copy the collection it refers to.
658 dst_image_col = copy_collection(image_uuid, src, dst, args)
659 elif arvados.util.keep_locator_pattern.match(docker_image):
660 dst_image_col = copy_collection(docker_image, src, dst, args)
662 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
664 def copy_project(obj_uuid, src, dst, owner_uuid, args):
666 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
668 # Create/update the destination project
669 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
670 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
671 if len(existing["items"]) == 0:
672 project_record = dst.groups().create(body={"group": {"group_class": "project",
673 "owner_uuid": owner_uuid,
674 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
676 project_record = existing["items"][0]
678 dst.groups().update(uuid=project_record["uuid"],
680 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
682 args.project_uuid = project_record["uuid"]
684 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
687 copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
691 for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
692 copy_workflow(w["uuid"], src, dst, args)
695 for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
696 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
698 return project_record
700 # git_rev_parse(rev, repo)
702 # Returns the 40-character commit hash corresponding to 'rev' in
703 # git repository 'repo' (which must be the path of a local git
706 def git_rev_parse(rev, repo):
707 gitout, giterr = arvados.util.run_command(
708 ['git', 'rev-parse', rev], cwd=repo)
709 return gitout.strip()
711 # uuid_type(api, object_uuid)
713 # Returns the name of the class that object_uuid belongs to, based on
714 # the second field of the uuid. This function consults the api's
715 # schema to identify the object class.
717 # It returns a string such as 'Collection', 'Workflow', etc.
719 # Special case: if handed a Keep locator hash, return 'Collection'.
721 def uuid_type(api, object_uuid):
722 if re.match(arvados.util.keep_locator_pattern, object_uuid):
724 p = object_uuid.split('-')
727 for k in api._schema.schemas:
728 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
729 if type_prefix == obj_class:
733 def abort(msg, code=1):
734 logger.info("arv-copy: %s", msg)
738 # Code for reporting on the progress of a collection upload.
739 # Stolen from arvados.commands.put.ArvPutCollectionWriter
740 # TODO(twp): figure out how to refactor into a shared library
741 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
744 def machine_progress(obj_uuid, bytes_written, bytes_expected):
745 return "{} {}: {} {} written {} total\n".format(
750 -1 if (bytes_expected is None) else bytes_expected)
752 def human_progress(obj_uuid, bytes_written, bytes_expected):
754 return "\r{}: {}M / {}M {:.1%} ".format(
756 bytes_written >> 20, bytes_expected >> 20,
757 float(bytes_written) / bytes_expected)
759 return "\r{}: {} ".format(obj_uuid, bytes_written)
761 class ProgressWriter(object):
762 _progress_func = None
765 def __init__(self, progress_func):
766 self._progress_func = progress_func
768 def report(self, obj_uuid, bytes_written, bytes_expected):
769 if self._progress_func is not None:
771 self._progress_func(obj_uuid, bytes_written, bytes_expected))
774 self.outfile.write("\n")
776 if __name__ == '__main__':