1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51 logger = logging.getLogger('arvados.arv-copy')
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
68 # The owner_uuid of the object being copied
72 copy_opts = argparse.ArgumentParser(add_help=False)
74 copy_opts.add_argument(
75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76 help='Print version and exit.')
77 copy_opts.add_argument(
78 '-v', '--verbose', dest='verbose', action='store_true',
79 help='Verbose output.')
80 copy_opts.add_argument(
81 '--progress', dest='progress', action='store_true',
82 help='Report progress on copying collections. (default)')
83 copy_opts.add_argument(
84 '--no-progress', dest='progress', action='store_false',
85 help='Do not report progress on copying collections.')
86 copy_opts.add_argument(
87 '-f', '--force', dest='force', action='store_true',
88 help='Perform copy even if the object appears to exist at the remote destination.')
89 copy_opts.add_argument(
90 '--src', dest='source_arvados', required=True,
91 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92 copy_opts.add_argument(
93 '--dst', dest='destination_arvados', required=True,
94 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--recursive', dest='recursive', action='store_true',
97 help='Recursively copy any dependencies for this object. (default)')
98 copy_opts.add_argument(
99 '--no-recursive', dest='recursive', action='store_false',
100 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101 copy_opts.add_argument(
102 '--dst-git-repo', dest='dst_git_repo',
103 help='The name of the destination git repository. Required when copying a pipeline recursively.')
104 copy_opts.add_argument(
105 '--project-uuid', dest='project_uuid',
106 help='The UUID of the project at the destination to which the pipeline should be copied.')
107 copy_opts.add_argument(
108 '--allow-git-http-src', action="store_true",
109 help='Allow cloning git repositories over insecure http')
110 copy_opts.add_argument(
111 '--allow-git-http-dst', action="store_true",
112 help='Allow pushing git repositories over insecure http')
114 copy_opts.add_argument(
116 help='The UUID of the object to be copied.')
117 copy_opts.set_defaults(progress=True)
118 copy_opts.set_defaults(recursive=True)
120 parser = argparse.ArgumentParser(
121 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
122 parents=[copy_opts, arv_cmd.retry_opt])
123 args = parser.parse_args()
126 logger.setLevel(logging.DEBUG)
128 logger.setLevel(logging.INFO)
130 # Create API clients for the source and destination instances
131 src_arv = api_for_instance(args.source_arvados)
132 dst_arv = api_for_instance(args.destination_arvados)
134 if not args.project_uuid:
135 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137 # Identify the kind of object we have been given, and begin copying.
138 t = uuid_type(src_arv, args.object_uuid)
139 if t == 'Collection':
140 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
141 result = copy_collection(args.object_uuid,
144 elif t == 'Workflow':
145 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
146 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
148 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
150 # Clean up any outstanding temp git repositories.
151 for d in listvalues(local_repo_dir):
152 shutil.rmtree(d, ignore_errors=True)
154 # If no exception was thrown and the response does not have an
155 # error_token field, presume success
156 if 'error_token' in result or 'uuid' not in result:
157 logger.error("API server returned an error result: {}".format(result))
161 logger.info("Success: created copy with uuid {}".format(result['uuid']))
164 def set_src_owner_uuid(resource, uuid, args):
165 global src_owner_uuid
166 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
167 src_owner_uuid = c.get("owner_uuid")
169 # api_for_instance(instance_name)
171 # Creates an API client for the Arvados instance identified by
174 # If instance_name contains a slash, it is presumed to be a path
175 # (either local or absolute) to a file with Arvados configuration
178 # Otherwise, it is presumed to be the name of a file in
179 # $HOME/.config/arvados/instance_name.conf
181 def api_for_instance(instance_name):
182 if '/' in instance_name:
183 config_file = instance_name
185 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
188 cfg = arvados.config.load(config_file)
189 except (IOError, OSError) as e:
190 abort(("Could not open config file {}: {}\n" +
191 "You must make sure that your configuration tokens\n" +
192 "for Arvados instance {} are in {} and that this\n" +
193 "file is readable.").format(
194 config_file, e, instance_name, config_file))
196 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
198 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
199 ['1', 't', 'true', 'y', 'yes']))
200 client = arvados.api('v1',
201 host=cfg['ARVADOS_API_HOST'],
202 token=cfg['ARVADOS_API_TOKEN'],
203 insecure=api_is_insecure,
204 model=OrderedJsonModel())
206 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
209 # Check if git is available
210 def check_git_availability():
212 arvados.util.run_command(['git', '--help'])
214 abort('git command is not available. Please ensure git is installed.')
217 def filter_iter(arg):
218 """Iterate a filter string-or-list.
220 Pass in a filter field that can either be a string or list.
221 This will iterate elements as if the field had been written as a list.
223 if isinstance(arg, basestring):
228 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
229 """Update a single repository filter in-place for the destination.
231 If the filter checks that the repository is src_repository, it is
232 updated to check that the repository is dst_repository. If it does
233 anything else, this function raises ValueError.
235 if src_repository is None:
236 raise ValueError("component does not specify a source repository")
237 elif dst_repository is None:
238 raise ValueError("no destination repository specified to update repository filter")
239 elif repo_filter[1:] == ['=', src_repository]:
240 repo_filter[2] = dst_repository
241 elif repo_filter[1:] == ['in', [src_repository]]:
242 repo_filter[2] = [dst_repository]
244 raise ValueError("repository filter is not a simple source match")
246 def migrate_script_version_filter(version_filter):
247 """Update a single script_version filter in-place for the destination.
249 Currently this function checks that all the filter operands are Git
250 commit hashes. If they're not, it raises ValueError to indicate that
251 the filter is not portable. It could be extended to make other
252 transformations in the future.
254 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
255 raise ValueError("script_version filter is not limited to commit hashes")
257 def attr_filtered(filter_, *attr_names):
258 """Return True if filter_ applies to any of attr_names, else False."""
259 return any((name == 'any') or (name in attr_names)
260 for name in filter_iter(filter_[0]))
262 @contextlib.contextmanager
263 def exception_handler(handler, *exc_types):
264 """If any exc_types are raised in the block, call handler on the exception."""
267 except exc_types as error:
271 # copy_workflow(wf_uuid, src, dst, args)
273 # Copies a workflow identified by wf_uuid from src to dst.
275 # If args.recursive is True, also copy any collections
276 # referenced in the workflow definition yaml.
278 # The owner_uuid of the new workflow is set to any given
279 # project_uuid or the user who copied the template.
281 # Returns the copied workflow object.
283 def copy_workflow(wf_uuid, src, dst, args):
284 # fetch the workflow from the source instance
285 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
287 # copy collections and docker images
289 wf_def = yaml.safe_load(wf["definition"])
290 if wf_def is not None:
293 graph = wf_def.get('$graph', None)
294 if graph is not None:
295 workflow_collections(graph, locations, docker_images)
297 workflow_collections(wf_def, locations, docker_images)
300 copy_collections(locations, src, dst, args)
302 for image in docker_images:
303 copy_docker_image(image, docker_images[image], src, dst, args)
305 # copy the workflow itself
307 wf['owner_uuid'] = args.project_uuid
308 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
310 def workflow_collections(obj, locations, docker_images):
311 if isinstance(obj, dict):
312 loc = obj.get('location', None)
314 if loc.startswith("keep:"):
315 locations.append(loc[5:])
317 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
318 if docker_image is not None:
319 ds = docker_image.split(":", 1)
320 tag = ds[1] if len(ds)==2 else 'latest'
321 docker_images[ds[0]] = tag
324 workflow_collections(obj[x], locations, docker_images)
325 elif isinstance(obj, list):
327 workflow_collections(x, locations, docker_images)
329 # copy_collections(obj, src, dst, args)
331 # Recursively copies all collections referenced by 'obj' from src
332 # to dst. obj may be a dict or a list, in which case we run
333 # copy_collections on every value it contains. If it is a string,
334 # search it for any substring that matches a collection hash or uuid
335 # (this will find hidden references to collections like
336 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
338 # Returns a copy of obj with any old collection uuids replaced by
341 def copy_collections(obj, src, dst, args):
343 def copy_collection_fn(collection_match):
344 """Helper function for regex substitution: copies a single collection,
345 identified by the collection_match MatchObject, to the
346 destination. Returns the destination collection uuid (or the
347 portable data hash if that's what src_id is).
350 src_id = collection_match.group(0)
351 if src_id not in collections_copied:
352 dst_col = copy_collection(src_id, src, dst, args)
353 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
354 collections_copied[src_id] = src_id
356 collections_copied[src_id] = dst_col['uuid']
357 return collections_copied[src_id]
359 if isinstance(obj, basestring):
360 # Copy any collections identified in this string to dst, replacing
361 # them with the dst uuids as necessary.
362 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
363 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
365 elif isinstance(obj, dict):
366 return type(obj)((v, copy_collections(obj[v], src, dst, args))
368 elif isinstance(obj, list):
369 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
373 def total_collection_size(manifest_text):
374 """Return the total number of bytes in this collection (excluding
375 duplicate blocks)."""
379 for line in manifest_text.splitlines():
381 for word in words[1:]:
383 loc = arvados.KeepLocator(word)
385 continue # this word isn't a locator, skip it
386 if loc.md5sum not in locators_seen:
387 locators_seen[loc.md5sum] = True
388 total_bytes += loc.size
392 def create_collection_from(c, src, dst, args):
393 """Create a new collection record on dst, and copy Docker metadata if
396 collection_uuid = c['uuid']
398 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
402 body['name'] = "copied from " + collection_uuid
404 body['owner_uuid'] = args.project_uuid
406 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
408 # Create docker_image_repo+tag and docker_image_hash links
409 # at the destination.
410 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
411 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
413 for src_link in docker_links:
414 body = {key: src_link[key]
415 for key in ['link_class', 'name', 'properties']}
416 body['head_uuid'] = dst_collection['uuid']
417 body['owner_uuid'] = args.project_uuid
419 lk = dst.links().create(body=body).execute(num_retries=args.retries)
420 logger.debug('created dst link {}'.format(lk))
422 return dst_collection
424 # copy_collection(obj_uuid, src, dst, args)
426 # Copies the collection identified by obj_uuid from src to dst.
427 # Returns the collection object created at dst.
429 # If args.progress is True, produce a human-friendly progress
432 # If a collection with the desired portable_data_hash already
433 # exists at dst, and args.force is False, copy_collection returns
434 # the existing collection without copying any blocks. Otherwise
435 # (if no collection exists or if args.force is True)
436 # copy_collection copies all of the collection data blocks from src
439 # For this application, it is critical to preserve the
440 # collection's manifest hash, which is not guaranteed with the
441 # arvados.CollectionReader and arvados.CollectionWriter classes.
442 # Copying each block in the collection manually, followed by
443 # the manifest block, ensures that the collection's manifest
444 # hash will not change.
446 def copy_collection(obj_uuid, src, dst, args):
447 if arvados.util.keep_locator_pattern.match(obj_uuid):
448 # If the obj_uuid is a portable data hash, it might not be
449 # uniquely identified with a particular collection. As a
450 # result, it is ambiguous as to what name to use for the copy.
451 # Apply some heuristics to pick which collection to get the
453 srccol = src.collections().list(
454 filters=[['portable_data_hash', '=', obj_uuid]],
455 order="created_at asc"
456 ).execute(num_retries=args.retries)
458 items = srccol.get("items")
461 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
467 # There's only one collection with the PDH, so use that.
470 # See if there is a collection that's in the same project
471 # as the root item (usually a pipeline) being copied.
473 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
477 # Didn't find any collections located in the same project, so
478 # pick the oldest collection that has a name assigned to it.
484 # None of the collections have names (?!), so just pick the
488 # list() doesn't return manifest text (and we don't want it to,
489 # because we don't need the same maninfest text sent to us 50
490 # times) so go and retrieve the collection object directly
491 # which will include the manifest text.
492 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
494 # Assume this is an actual collection uuid, so fetch it directly.
495 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
497 # If a collection with this hash already exists at the
498 # destination, and 'force' is not true, just return that
501 if 'portable_data_hash' in c:
502 colhash = c['portable_data_hash']
505 dstcol = dst.collections().list(
506 filters=[['portable_data_hash', '=', colhash]]
507 ).execute(num_retries=args.retries)
508 if dstcol['items_available'] > 0:
509 for d in dstcol['items']:
510 if ((args.project_uuid == d['owner_uuid']) and
511 (c.get('name') == d['name']) and
512 (c['portable_data_hash'] == d['portable_data_hash'])):
514 c['manifest_text'] = dst.collections().get(
515 uuid=dstcol['items'][0]['uuid']
516 ).execute(num_retries=args.retries)['manifest_text']
517 return create_collection_from(c, src, dst, args)
519 # Fetch the collection's manifest.
520 manifest = c['manifest_text']
521 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
523 # Copy each block from src_keep to dst_keep.
524 # Use the newly signed locators returned from dst_keep to build
525 # a new manifest as we go.
526 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
527 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
531 bytes_expected = total_collection_size(manifest)
533 progress_writer = ProgressWriter(human_progress)
535 progress_writer = None
537 for line in manifest.splitlines():
539 dst_manifest += words[0]
540 for word in words[1:]:
542 loc = arvados.KeepLocator(word)
544 # If 'word' can't be parsed as a locator,
545 # presume it's a filename.
546 dst_manifest += ' ' + word
548 blockhash = loc.md5sum
549 # copy this block if we haven't seen it before
550 # (otherwise, just reuse the existing dst_locator)
551 if blockhash not in dst_locators:
552 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
554 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
555 data = src_keep.get(word)
556 dst_locator = dst_keep.put(data)
557 dst_locators[blockhash] = dst_locator
558 bytes_written += loc.size
559 dst_manifest += ' ' + dst_locators[blockhash]
563 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
564 progress_writer.finish()
566 # Copy the manifest and save the collection.
567 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
569 c['manifest_text'] = dst_manifest
570 return create_collection_from(c, src, dst, args)
572 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
573 r = api.repositories().list(
574 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
575 if r['items_available'] != 1:
576 raise Exception('cannot identify repo {}; {} repos found'
577 .format(repo_name, r['items_available']))
579 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
580 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
581 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
583 priority = https_url + other_url + http_url
588 if url.startswith("http"):
589 u = urllib.parse.urlsplit(url)
590 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
591 git_config = ["-c", "credential.%s/.username=none" % baseurl,
592 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
597 logger.debug("trying %s", url)
598 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
599 env={"HOME": os.environ["HOME"],
600 "ARVADOS_API_TOKEN": api.api_token,
601 "GIT_ASKPASS": "/bin/false"})
602 except arvados.errors.CommandFailedError:
609 raise Exception('Cannot access git repository, tried {}'
612 if git_url.startswith("http:"):
613 if allow_insecure_http:
614 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
616 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
618 return (git_url, git_config)
621 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
623 # Copies commits from git repository 'src_git_repo' on Arvados
624 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
625 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
628 # All commits will be copied to a destination branch named for the
629 # source repository URL.
631 # The destination repository must already exist.
633 # The user running this command must be authenticated
634 # to both repositories.
636 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
637 # Identify the fetch and push URLs for the git repositories.
639 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
640 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
642 logger.debug('src_git_url: {}'.format(src_git_url))
643 logger.debug('dst_git_url: {}'.format(dst_git_url))
645 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
647 # Copy git commits from src repo to dst repo.
648 if src_git_repo not in local_repo_dir:
649 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
650 arvados.util.run_command(
651 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
652 local_repo_dir[src_git_repo]],
653 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
654 env={"HOME": os.environ["HOME"],
655 "ARVADOS_API_TOKEN": src.api_token,
656 "GIT_ASKPASS": "/bin/false"})
657 arvados.util.run_command(
658 ["git", "remote", "add", "dst", dst_git_url],
659 cwd=local_repo_dir[src_git_repo])
660 arvados.util.run_command(
661 ["git", "branch", dst_branch, script_version],
662 cwd=local_repo_dir[src_git_repo])
663 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
664 cwd=local_repo_dir[src_git_repo],
665 env={"HOME": os.environ["HOME"],
666 "ARVADOS_API_TOKEN": dst.api_token,
667 "GIT_ASKPASS": "/bin/false"})
670 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
671 """Copy the docker image identified by docker_image and
672 docker_image_tag from src to dst. Create appropriate
673 docker_image_repo+tag and docker_image_hash links at dst.
677 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
679 # Find the link identifying this docker image.
680 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
681 src, args.retries, docker_image, docker_image_tag)
682 if docker_image_list:
683 image_uuid, image_info = docker_image_list[0]
684 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
686 # Copy the collection it refers to.
687 dst_image_col = copy_collection(image_uuid, src, dst, args)
688 elif arvados.util.keep_locator_pattern.match(docker_image):
689 dst_image_col = copy_collection(docker_image, src, dst, args)
691 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
693 # git_rev_parse(rev, repo)
695 # Returns the 40-character commit hash corresponding to 'rev' in
696 # git repository 'repo' (which must be the path of a local git
699 def git_rev_parse(rev, repo):
700 gitout, giterr = arvados.util.run_command(
701 ['git', 'rev-parse', rev], cwd=repo)
702 return gitout.strip()
704 # uuid_type(api, object_uuid)
706 # Returns the name of the class that object_uuid belongs to, based on
707 # the second field of the uuid. This function consults the api's
708 # schema to identify the object class.
710 # It returns a string such as 'Collection', 'PipelineInstance', etc.
712 # Special case: if handed a Keep locator hash, return 'Collection'.
714 def uuid_type(api, object_uuid):
715 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
717 p = object_uuid.split('-')
720 for k in api._schema.schemas:
721 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
722 if type_prefix == obj_class:
726 def abort(msg, code=1):
727 logger.info("arv-copy: %s", msg)
731 # Code for reporting on the progress of a collection upload.
732 # Stolen from arvados.commands.put.ArvPutCollectionWriter
733 # TODO(twp): figure out how to refactor into a shared library
734 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
737 def machine_progress(obj_uuid, bytes_written, bytes_expected):
738 return "{} {}: {} {} written {} total\n".format(
743 -1 if (bytes_expected is None) else bytes_expected)
745 def human_progress(obj_uuid, bytes_written, bytes_expected):
747 return "\r{}: {}M / {}M {:.1%} ".format(
749 bytes_written >> 20, bytes_expected >> 20,
750 float(bytes_written) / bytes_expected)
752 return "\r{}: {} ".format(obj_uuid, bytes_written)
754 class ProgressWriter(object):
755 _progress_func = None
758 def __init__(self, progress_func):
759 self._progress_func = progress_func
761 def report(self, obj_uuid, bytes_written, bytes_expected):
762 if self._progress_func is not None:
764 self._progress_func(obj_uuid, bytes_written, bytes_expected))
767 self.outfile.write("\n")
769 if __name__ == '__main__':