Merge branch '4088-collection-show-files-filter'
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 def main():
50     copy_opts = argparse.ArgumentParser(add_help=False)
51
52     copy_opts.add_argument(
53         '-v', '--verbose', dest='verbose', action='store_true',
54         help='Verbose output.')
55     copy_opts.add_argument(
56         '--progress', dest='progress', action='store_true',
57         help='Report progress on copying collections. (default)')
58     copy_opts.add_argument(
59         '--no-progress', dest='progress', action='store_false',
60         help='Do not report progress on copying collections.')
61     copy_opts.add_argument(
62         '-f', '--force', dest='force', action='store_true',
63         help='Perform copy even if the object appears to exist at the remote destination.')
64     copy_opts.add_argument(
65         '--src', dest='source_arvados', required=True,
66         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
67     copy_opts.add_argument(
68         '--dst', dest='destination_arvados', required=True,
69         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
70     copy_opts.add_argument(
71         '--recursive', dest='recursive', action='store_true',
72         help='Recursively copy any dependencies for this object. (default)')
73     copy_opts.add_argument(
74         '--no-recursive', dest='recursive', action='store_false',
75         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
76     copy_opts.add_argument(
77         '--dst-git-repo', dest='dst_git_repo',
78         help='The name of the destination git repository. Required when copying a pipeline recursively.')
79     copy_opts.add_argument(
80         '--project-uuid', dest='project_uuid',
81         help='The UUID of the project at the destination to which the pipeline should be copied.')
82     copy_opts.add_argument(
83         'object_uuid',
84         help='The UUID of the object to be copied.')
85     copy_opts.set_defaults(progress=True)
86     copy_opts.set_defaults(recursive=True)
87
88     parser = argparse.ArgumentParser(
89         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
90         parents=[copy_opts, arv_cmd.retry_opt])
91     args = parser.parse_args()
92
93     if args.verbose:
94         logger.setLevel(logging.DEBUG)
95     else:
96         logger.setLevel(logging.INFO)
97
98     # Create API clients for the source and destination instances
99     src_arv = api_for_instance(args.source_arvados)
100     dst_arv = api_for_instance(args.destination_arvados)
101
102     # Identify the kind of object we have been given, and begin copying.
103     t = uuid_type(src_arv, args.object_uuid)
104     if t == 'Collection':
105         result = copy_collection(args.object_uuid,
106                                  src_arv, dst_arv,
107                                  args)
108     elif t == 'PipelineInstance':
109         result = copy_pipeline_instance(args.object_uuid,
110                                         src_arv, dst_arv,
111                                         args)
112     elif t == 'PipelineTemplate':
113         result = copy_pipeline_template(args.object_uuid,
114                                         src_arv, dst_arv, args)
115     else:
116         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
117
118     # Clean up any outstanding temp git repositories.
119     for d in local_repo_dir.values():
120         shutil.rmtree(d, ignore_errors=True)
121
122     # If no exception was thrown and the response does not have an
123     # error_token field, presume success
124     if 'error_token' in result or 'uuid' not in result:
125         logger.error("API server returned an error result: {}".format(result))
126         exit(1)
127
128     logger.info("")
129     logger.info("Success: created copy with uuid {}".format(result['uuid']))
130     exit(0)
131
132 # api_for_instance(instance_name)
133 #
134 #     Creates an API client for the Arvados instance identified by
135 #     instance_name.
136 #
137 #     If instance_name contains a slash, it is presumed to be a path
138 #     (either local or absolute) to a file with Arvados configuration
139 #     settings.
140 #
141 #     Otherwise, it is presumed to be the name of a file in
142 #     $HOME/.config/arvados/instance_name.conf
143 #
144 def api_for_instance(instance_name):
145     if '/' in instance_name:
146         config_file = instance_name
147     else:
148         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
149
150     try:
151         cfg = arvados.config.load(config_file)
152     except (IOError, OSError) as e:
153         abort(("Could not open config file {}: {}\n" +
154                "You must make sure that your configuration tokens\n" +
155                "for Arvados instance {} are in {} and that this\n" +
156                "file is readable.").format(
157                    config_file, e, instance_name, config_file))
158
159     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
160         api_is_insecure = (
161             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
162                 ['1', 't', 'true', 'y', 'yes']))
163         client = arvados.api('v1',
164                              host=cfg['ARVADOS_API_HOST'],
165                              token=cfg['ARVADOS_API_TOKEN'],
166                              insecure=api_is_insecure,
167                              cache=False)
168     else:
169         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
170     return client
171
172 # copy_pipeline_instance(pi_uuid, src, dst, args)
173 #
174 #    Copies a pipeline instance identified by pi_uuid from src to dst.
175 #
176 #    If the args.recursive option is set:
177 #      1. Copies all input collections
178 #           * For each component in the pipeline, include all collections
179 #             listed as job dependencies for that component)
180 #      2. Copy docker images
181 #      3. Copy git repositories
182 #      4. Copy the pipeline template
183 #
184 #    The only changes made to the copied pipeline instance are:
185 #      1. The original pipeline instance UUID is preserved in
186 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
187 #      2. The pipeline_template_uuid is changed to the new template uuid.
188 #      3. The owner_uuid of the instance is changed to the user who
189 #         copied it.
190 #
191 def copy_pipeline_instance(pi_uuid, src, dst, args):
192     # Fetch the pipeline instance record.
193     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
194
195     if args.recursive:
196         if not args.dst_git_repo:
197             abort('--dst-git-repo is required when copying a pipeline recursively.')
198         # Copy the pipeline template and save the copied template.
199         if pi.get('pipeline_template_uuid', None):
200             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
201                                         src, dst, args)
202
203         # Copy input collections, docker images and git repos.
204         pi = copy_collections(pi, src, dst, args)
205         copy_git_repos(pi, src, dst, args.dst_git_repo)
206         copy_docker_images(pi, src, dst, args)
207
208         # Update the fields of the pipeline instance with the copied
209         # pipeline template.
210         if pi.get('pipeline_template_uuid', None):
211             pi['pipeline_template_uuid'] = pt['uuid']
212
213     else:
214         # not recursive
215         logger.info("Copying only pipeline instance %s.", pi_uuid)
216         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
217
218     # Update the pipeline instance properties, and create the new
219     # instance at dst.
220     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
221     pi['description'] = "Pipeline copied from {}\n\n{}".format(
222         pi_uuid,
223         pi['description'] if pi.get('description', None) else '')
224     if args.project_uuid:
225         pi['owner_uuid'] = args.project_uuid
226     else:
227         del pi['owner_uuid']
228     del pi['uuid']
229
230     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute()
231     return new_pi
232
233 # copy_pipeline_template(pt_uuid, src, dst, args)
234 #
235 #    Copies a pipeline template identified by pt_uuid from src to dst.
236 #
237 #    If args.recursive is True, also copy any collections, docker
238 #    images and git repositories that this template references.
239 #
240 #    The owner_uuid of the new template is changed to that of the user
241 #    who copied the template.
242 #
243 #    Returns the copied pipeline template object.
244 #
245 def copy_pipeline_template(pt_uuid, src, dst, args):
246     # fetch the pipeline template from the source instance
247     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
248
249     if args.recursive:
250         if not args.dst_git_repo:
251             abort('--dst-git-repo is required when copying a pipeline recursively.')
252         # Copy input collections, docker images and git repos.
253         pt = copy_collections(pt, src, dst, args)
254         copy_git_repos(pt, src, dst, args.dst_git_repo)
255         copy_docker_images(pt, src, dst, args)
256
257     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
258         pt_uuid,
259         pt['description'] if pt.get('description', None) else '')
260     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
261     del pt['uuid']
262     del pt['owner_uuid']
263
264     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute()
265
266 # copy_collections(obj, src, dst, args)
267 #
268 #    Recursively copies all collections referenced by 'obj' from src
269 #    to dst.  obj may be a dict or a list, in which case we run
270 #    copy_collections on every value it contains. If it is a string,
271 #    search it for any substring that matches a collection hash or uuid
272 #    (this will find hidden references to collections like
273 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
274 #
275 #    Returns a copy of obj with any old collection uuids replaced by
276 #    the new ones.
277 #
278 def copy_collections(obj, src, dst, args):
279
280     def copy_collection_fn(collection_match):
281         """Helper function for regex substitution: copies a single collection,
282         identified by the collection_match MatchObject, to the
283         destination.  Returns the destination collection uuid (or the
284         portable data hash if that's what src_id is).
285
286         """
287         src_id = collection_match.group(0)
288         if src_id not in collections_copied:
289             dst_col = copy_collection(src_id, src, dst, args)
290             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
291                 collections_copied[src_id] = src_id
292             else:
293                 collections_copied[src_id] = dst_col['uuid']
294         return collections_copied[src_id]
295
296     if isinstance(obj, basestring):
297         # Copy any collections identified in this string to dst, replacing
298         # them with the dst uuids as necessary.
299         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
300         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
301         return obj
302     elif type(obj) == dict:
303         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
304     elif type(obj) == list:
305         return [copy_collections(v, src, dst, args) for v in obj]
306     return obj
307
308 # copy_git_repos(p, src, dst, dst_repo)
309 #
310 #    Copies all git repositories referenced by pipeline instance or
311 #    template 'p' from src to dst.
312 #
313 #    For each component c in the pipeline:
314 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
315 #      * Rename script versions:
316 #          * c['script_version']
317 #          * c['job']['script_version']
318 #          * c['job']['supplied_script_version']
319 #        to the commit hashes they resolve to, since any symbolic
320 #        names (tags, branches) are not preserved in the destination repo.
321 #
322 #    The pipeline object is updated in place with the new repository
323 #    names.  The return value is undefined.
324 #
325 def copy_git_repos(p, src, dst, dst_repo):
326     copied = set()
327     for c in p['components']:
328         component = p['components'][c]
329         if 'repository' in component:
330             repo = component['repository']
331             script_version = component.get('script_version', None)
332             if repo not in copied:
333                 copy_git_repo(repo, src, dst, dst_repo, script_version)
334                 copied.add(repo)
335             component['repository'] = dst_repo
336             if script_version:
337                 repo_dir = local_repo_dir[repo]
338                 component['script_version'] = git_rev_parse(script_version, repo_dir)
339         if 'job' in component:
340             j = component['job']
341             if 'repository' in j:
342                 repo = j['repository']
343                 script_version = j.get('script_version', None)
344                 if repo not in copied:
345                     copy_git_repo(repo, src, dst, dst_repo, script_version)
346                     copied.add(repo)
347                 j['repository'] = dst_repo
348                 repo_dir = local_repo_dir[repo]
349                 if script_version:
350                     j['script_version'] = git_rev_parse(script_version, repo_dir)
351                 if 'supplied_script_version' in j:
352                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
353
354 def total_collection_size(manifest_text):
355     """Return the total number of bytes in this collection (excluding
356     duplicate blocks)."""
357
358     total_bytes = 0
359     locators_seen = {}
360     for line in manifest_text.splitlines():
361         words = line.split()
362         for word in words[1:]:
363             try:
364                 loc = arvados.KeepLocator(word)
365             except ValueError:
366                 continue  # this word isn't a locator, skip it
367             if loc.md5sum not in locators_seen:
368                 locators_seen[loc.md5sum] = True
369                 total_bytes += loc.size
370
371     return total_bytes
372
373 # copy_collection(obj_uuid, src, dst, args)
374 #
375 #    Copies the collection identified by obj_uuid from src to dst.
376 #    Returns the collection object created at dst.
377 #
378 #    If args.progress is True, produce a human-friendly progress
379 #    report.
380 #
381 #    If a collection with the desired portable_data_hash already
382 #    exists at dst, and args.force is False, copy_collection returns
383 #    the existing collection without copying any blocks.  Otherwise
384 #    (if no collection exists or if args.force is True)
385 #    copy_collection copies all of the collection data blocks from src
386 #    to dst.
387 #
388 #    For this application, it is critical to preserve the
389 #    collection's manifest hash, which is not guaranteed with the
390 #    arvados.CollectionReader and arvados.CollectionWriter classes.
391 #    Copying each block in the collection manually, followed by
392 #    the manifest block, ensures that the collection's manifest
393 #    hash will not change.
394 #
395 def copy_collection(obj_uuid, src, dst, args):
396     c = src.collections().get(uuid=obj_uuid).execute()
397
398     # If a collection with this hash already exists at the
399     # destination, and 'force' is not true, just return that
400     # collection.
401     if not args.force:
402         if 'portable_data_hash' in c:
403             colhash = c['portable_data_hash']
404         else:
405             colhash = c['uuid']
406         dstcol = dst.collections().list(
407             filters=[['portable_data_hash', '=', colhash]]
408         ).execute()
409         if dstcol['items_available'] > 0:
410             logger.debug("Skipping collection %s (already at dst)", obj_uuid)
411             return dstcol['items'][0]
412
413     # Fetch the collection's manifest.
414     manifest = c['manifest_text']
415     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
416
417     # Copy each block from src_keep to dst_keep.
418     # Use the newly signed locators returned from dst_keep to build
419     # a new manifest as we go.
420     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
421     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
422     dst_manifest = ""
423     dst_locators = {}
424     bytes_written = 0
425     bytes_expected = total_collection_size(manifest)
426     if args.progress:
427         progress_writer = ProgressWriter(human_progress)
428     else:
429         progress_writer = None
430
431     for line in manifest.splitlines(True):
432         words = line.split()
433         dst_manifest_line = words[0]
434         for word in words[1:]:
435             try:
436                 loc = arvados.KeepLocator(word)
437                 blockhash = loc.md5sum
438                 # copy this block if we haven't seen it before
439                 # (otherwise, just reuse the existing dst_locator)
440                 if blockhash not in dst_locators:
441                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
442                     if progress_writer:
443                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
444                     data = src_keep.get(word)
445                     dst_locator = dst_keep.put(data)
446                     dst_locators[blockhash] = dst_locator
447                     bytes_written += loc.size
448                 dst_manifest_line += ' ' + dst_locators[blockhash]
449             except ValueError:
450                 # If 'word' can't be parsed as a locator,
451                 # presume it's a filename.
452                 dst_manifest_line += ' ' + word
453         dst_manifest += dst_manifest_line
454         if line.endswith("\n"):
455             dst_manifest += "\n"
456
457     if progress_writer:
458         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
459         progress_writer.finish()
460
461     # Copy the manifest and save the collection.
462     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
463     dst_keep.put(dst_manifest)
464
465     if 'uuid' in c:
466         del c['uuid']
467     if 'owner_uuid' in c:
468         del c['owner_uuid']
469     c['manifest_text'] = dst_manifest
470     return dst.collections().create(body=c, ensure_unique_name=True).execute()
471
472 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version)
473 #
474 #    Copies commits from git repository 'src_git_repo' on Arvados
475 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
476 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
477 #    or "jsmith")
478 #
479 #    All commits will be copied to a destination branch named for the
480 #    source repository URL.
481 #
482 #    Because users cannot create their own repositories, the
483 #    destination repository must already exist.
484 #
485 #    The user running this command must be authenticated
486 #    to both repositories.
487 #
488 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version):
489     # Identify the fetch and push URLs for the git repositories.
490     r = src.repositories().list(
491         filters=[['name', '=', src_git_repo]]).execute()
492     if r['items_available'] != 1:
493         raise Exception('cannot identify source repo {}; {} repos found'
494                         .format(src_git_repo, r['items_available']))
495     src_git_url = r['items'][0]['fetch_url']
496     logger.debug('src_git_url: {}'.format(src_git_url))
497
498     r = dst.repositories().list(
499         filters=[['name', '=', dst_git_repo]]).execute()
500     if r['items_available'] != 1:
501         raise Exception('cannot identify destination repo {}; {} repos found'
502                         .format(dst_git_repo, r['items_available']))
503     dst_git_push_url  = r['items'][0]['push_url']
504     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
505
506     # script_version is the "script_version" parameter from the source
507     # component or job.  It is used here to tie the destination branch
508     # to the commit that was used on the source.  If no script_version
509     # was supplied in the component or job, it is a mistake in the pipeline,
510     # but for the purposes of copying the repository, default to "master".
511     #
512     if not script_version:
513         script_version = "master"
514
515     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
516
517     # Copy git commits from src repo to dst repo (but only if
518     # we have not already copied this repo in this session).
519     #
520     if src_git_repo in local_repo_dir:
521         logger.debug('already copied src repo %s, skipping', src_git_repo)
522     else:
523         tmprepo = tempfile.mkdtemp()
524         local_repo_dir[src_git_repo] = tmprepo
525         arvados.util.run_command(
526             ["git", "clone", "--bare", src_git_url, tmprepo],
527             cwd=os.path.dirname(tmprepo))
528         arvados.util.run_command(
529             ["git", "branch", dst_branch, script_version],
530             cwd=tmprepo)
531         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
532         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
533
534
535 def copy_docker_images(pipeline, src, dst, args):
536     """Copy any docker images named in the pipeline components'
537     runtime_constraints field from src to dst."""
538
539     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
540     for c_name, c_info in pipeline['components'].iteritems():
541         if ('runtime_constraints' in c_info and
542             'docker_image' in c_info['runtime_constraints']):
543             copy_docker_image(
544                 c_info['runtime_constraints']['docker_image'],
545                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
546                 src, dst, args)
547
548
549 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
550     """Copy the docker image identified by docker_image and
551     docker_image_tag from src to dst. Create appropriate
552     docker_image_repo+tag and docker_image_hash links at dst.
553
554     """
555
556     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
557
558     # Find the link identifying this docker image.
559     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
560         src, args.retries, docker_image, docker_image_tag)
561     image_uuid, image_info = docker_image_list[0]
562     logger.debug('copying collection {} {}'.format(image_uuid, image_info))
563
564     # Copy the collection it refers to.
565     dst_image_col = copy_collection(image_uuid, src, dst, args)
566
567     # Create docker_image_repo+tag and docker_image_hash links
568     # at the destination.
569     lk = dst.links().create(
570         body={
571             'head_uuid': dst_image_col['uuid'],
572             'link_class': 'docker_image_repo+tag',
573             'name': "{}:{}".format(docker_image, docker_image_tag),
574         }
575     ).execute(num_retries=args.retries)
576     logger.debug('created dst link {}'.format(lk))
577
578     lk = dst.links().create(
579         body={
580             'head_uuid': dst_image_col['uuid'],
581             'link_class': 'docker_image_hash',
582             'name': dst_image_col['portable_data_hash'],
583         }
584     ).execute(num_retries=args.retries)
585     logger.debug('created dst link {}'.format(lk))
586
587
588 # git_rev_parse(rev, repo)
589 #
590 #    Returns the 40-character commit hash corresponding to 'rev' in
591 #    git repository 'repo' (which must be the path of a local git
592 #    repository)
593 #
594 def git_rev_parse(rev, repo):
595     gitout, giterr = arvados.util.run_command(
596         ['git', 'rev-parse', rev], cwd=repo)
597     return gitout.strip()
598
599 # uuid_type(api, object_uuid)
600 #
601 #    Returns the name of the class that object_uuid belongs to, based on
602 #    the second field of the uuid.  This function consults the api's
603 #    schema to identify the object class.
604 #
605 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
606 #
607 #    Special case: if handed a Keep locator hash, return 'Collection'.
608 #
609 def uuid_type(api, object_uuid):
610     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
611         return 'Collection'
612     p = object_uuid.split('-')
613     if len(p) == 3:
614         type_prefix = p[1]
615         for k in api._schema.schemas:
616             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
617             if type_prefix == obj_class:
618                 return k
619     return None
620
621 def abort(msg, code=1):
622     logger.info("arv-copy: %s", msg)
623     exit(code)
624
625
626 # Code for reporting on the progress of a collection upload.
627 # Stolen from arvados.commands.put.ArvPutCollectionWriter
628 # TODO(twp): figure out how to refactor into a shared library
629 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
630 # code)
631
632 def machine_progress(obj_uuid, bytes_written, bytes_expected):
633     return "{} {}: {} {} written {} total\n".format(
634         sys.argv[0],
635         os.getpid(),
636         obj_uuid,
637         bytes_written,
638         -1 if (bytes_expected is None) else bytes_expected)
639
640 def human_progress(obj_uuid, bytes_written, bytes_expected):
641     if bytes_expected:
642         return "\r{}: {}M / {}M {:.1%} ".format(
643             obj_uuid,
644             bytes_written >> 20, bytes_expected >> 20,
645             float(bytes_written) / bytes_expected)
646     else:
647         return "\r{}: {} ".format(obj_uuid, bytes_written)
648
649 class ProgressWriter(object):
650     _progress_func = None
651     outfile = sys.stderr
652
653     def __init__(self, progress_func):
654         self._progress_func = progress_func
655
656     def report(self, obj_uuid, bytes_written, bytes_expected):
657         if self._progress_func is not None:
658             self.outfile.write(
659                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
660
661     def finish(self):
662         self.outfile.write("\n")
663
664 if __name__ == '__main__':
665     main()