Merge branch 'master' into 4904-arv-web
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 def main():
50     copy_opts = argparse.ArgumentParser(add_help=False)
51
52     copy_opts.add_argument(
53         '-v', '--verbose', dest='verbose', action='store_true',
54         help='Verbose output.')
55     copy_opts.add_argument(
56         '--progress', dest='progress', action='store_true',
57         help='Report progress on copying collections. (default)')
58     copy_opts.add_argument(
59         '--no-progress', dest='progress', action='store_false',
60         help='Do not report progress on copying collections.')
61     copy_opts.add_argument(
62         '-f', '--force', dest='force', action='store_true',
63         help='Perform copy even if the object appears to exist at the remote destination.')
64     copy_opts.add_argument(
65         '--src', dest='source_arvados', required=True,
66         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
67     copy_opts.add_argument(
68         '--dst', dest='destination_arvados', required=True,
69         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
70     copy_opts.add_argument(
71         '--recursive', dest='recursive', action='store_true',
72         help='Recursively copy any dependencies for this object. (default)')
73     copy_opts.add_argument(
74         '--no-recursive', dest='recursive', action='store_false',
75         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
76     copy_opts.add_argument(
77         '--dst-git-repo', dest='dst_git_repo',
78         help='The name of the destination git repository. Required when copying a pipeline recursively.')
79     copy_opts.add_argument(
80         '--project-uuid', dest='project_uuid',
81         help='The UUID of the project at the destination to which the pipeline should be copied.')
82     copy_opts.add_argument(
83         'object_uuid',
84         help='The UUID of the object to be copied.')
85     copy_opts.set_defaults(progress=True)
86     copy_opts.set_defaults(recursive=True)
87
88     parser = argparse.ArgumentParser(
89         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
90         parents=[copy_opts, arv_cmd.retry_opt])
91     args = parser.parse_args()
92
93     if args.verbose:
94         logger.setLevel(logging.DEBUG)
95     else:
96         logger.setLevel(logging.INFO)
97
98     # Create API clients for the source and destination instances
99     src_arv = api_for_instance(args.source_arvados)
100     dst_arv = api_for_instance(args.destination_arvados)
101
102     # Identify the kind of object we have been given, and begin copying.
103     t = uuid_type(src_arv, args.object_uuid)
104     if t == 'Collection':
105         result = copy_collection(args.object_uuid,
106                                  src_arv, dst_arv,
107                                  args)
108     elif t == 'PipelineInstance':
109         result = copy_pipeline_instance(args.object_uuid,
110                                         src_arv, dst_arv,
111                                         args)
112     elif t == 'PipelineTemplate':
113         result = copy_pipeline_template(args.object_uuid,
114                                         src_arv, dst_arv, args)
115     else:
116         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
117
118     # Clean up any outstanding temp git repositories.
119     for d in local_repo_dir.values():
120         shutil.rmtree(d, ignore_errors=True)
121
122     # If no exception was thrown and the response does not have an
123     # error_token field, presume success
124     if 'error_token' in result or 'uuid' not in result:
125         logger.error("API server returned an error result: {}".format(result))
126         exit(1)
127
128     logger.info("")
129     logger.info("Success: created copy with uuid {}".format(result['uuid']))
130     exit(0)
131
132 # api_for_instance(instance_name)
133 #
134 #     Creates an API client for the Arvados instance identified by
135 #     instance_name.
136 #
137 #     If instance_name contains a slash, it is presumed to be a path
138 #     (either local or absolute) to a file with Arvados configuration
139 #     settings.
140 #
141 #     Otherwise, it is presumed to be the name of a file in
142 #     $HOME/.config/arvados/instance_name.conf
143 #
144 def api_for_instance(instance_name):
145     if '/' in instance_name:
146         config_file = instance_name
147     else:
148         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
149
150     try:
151         cfg = arvados.config.load(config_file)
152     except (IOError, OSError) as e:
153         abort(("Could not open config file {}: {}\n" +
154                "You must make sure that your configuration tokens\n" +
155                "for Arvados instance {} are in {} and that this\n" +
156                "file is readable.").format(
157                    config_file, e, instance_name, config_file))
158
159     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
160         api_is_insecure = (
161             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
162                 ['1', 't', 'true', 'y', 'yes']))
163         client = arvados.api('v1',
164                              host=cfg['ARVADOS_API_HOST'],
165                              token=cfg['ARVADOS_API_TOKEN'],
166                              insecure=api_is_insecure)
167     else:
168         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
169     return client
170
171 # copy_pipeline_instance(pi_uuid, src, dst, args)
172 #
173 #    Copies a pipeline instance identified by pi_uuid from src to dst.
174 #
175 #    If the args.recursive option is set:
176 #      1. Copies all input collections
177 #           * For each component in the pipeline, include all collections
178 #             listed as job dependencies for that component)
179 #      2. Copy docker images
180 #      3. Copy git repositories
181 #      4. Copy the pipeline template
182 #
183 #    The only changes made to the copied pipeline instance are:
184 #      1. The original pipeline instance UUID is preserved in
185 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
186 #      2. The pipeline_template_uuid is changed to the new template uuid.
187 #      3. The owner_uuid of the instance is changed to the user who
188 #         copied it.
189 #
190 def copy_pipeline_instance(pi_uuid, src, dst, args):
191     # Fetch the pipeline instance record.
192     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
193
194     if args.recursive:
195         if not args.dst_git_repo:
196             abort('--dst-git-repo is required when copying a pipeline recursively.')
197         # Copy the pipeline template and save the copied template.
198         if pi.get('pipeline_template_uuid', None):
199             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
200                                         src, dst, args)
201
202         # Copy input collections, docker images and git repos.
203         pi = copy_collections(pi, src, dst, args)
204         copy_git_repos(pi, src, dst, args.dst_git_repo)
205         copy_docker_images(pi, src, dst, args)
206
207         # Update the fields of the pipeline instance with the copied
208         # pipeline template.
209         if pi.get('pipeline_template_uuid', None):
210             pi['pipeline_template_uuid'] = pt['uuid']
211
212     else:
213         # not recursive
214         logger.info("Copying only pipeline instance %s.", pi_uuid)
215         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
216
217     # Update the pipeline instance properties, and create the new
218     # instance at dst.
219     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
220     pi['description'] = "Pipeline copied from {}\n\n{}".format(
221         pi_uuid,
222         pi['description'] if pi.get('description', None) else '')
223     if args.project_uuid:
224         pi['owner_uuid'] = args.project_uuid
225     else:
226         del pi['owner_uuid']
227     del pi['uuid']
228
229     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute()
230     return new_pi
231
232 # copy_pipeline_template(pt_uuid, src, dst, args)
233 #
234 #    Copies a pipeline template identified by pt_uuid from src to dst.
235 #
236 #    If args.recursive is True, also copy any collections, docker
237 #    images and git repositories that this template references.
238 #
239 #    The owner_uuid of the new template is changed to that of the user
240 #    who copied the template.
241 #
242 #    Returns the copied pipeline template object.
243 #
244 def copy_pipeline_template(pt_uuid, src, dst, args):
245     # fetch the pipeline template from the source instance
246     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
247
248     if args.recursive:
249         if not args.dst_git_repo:
250             abort('--dst-git-repo is required when copying a pipeline recursively.')
251         # Copy input collections, docker images and git repos.
252         pt = copy_collections(pt, src, dst, args)
253         copy_git_repos(pt, src, dst, args.dst_git_repo)
254         copy_docker_images(pt, src, dst, args)
255
256     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
257         pt_uuid,
258         pt['description'] if pt.get('description', None) else '')
259     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
260     del pt['uuid']
261     del pt['owner_uuid']
262
263     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute()
264
265 # copy_collections(obj, src, dst, args)
266 #
267 #    Recursively copies all collections referenced by 'obj' from src
268 #    to dst.  obj may be a dict or a list, in which case we run
269 #    copy_collections on every value it contains. If it is a string,
270 #    search it for any substring that matches a collection hash or uuid
271 #    (this will find hidden references to collections like
272 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
273 #
274 #    Returns a copy of obj with any old collection uuids replaced by
275 #    the new ones.
276 #
277 def copy_collections(obj, src, dst, args):
278
279     def copy_collection_fn(collection_match):
280         """Helper function for regex substitution: copies a single collection,
281         identified by the collection_match MatchObject, to the
282         destination.  Returns the destination collection uuid (or the
283         portable data hash if that's what src_id is).
284
285         """
286         src_id = collection_match.group(0)
287         if src_id not in collections_copied:
288             dst_col = copy_collection(src_id, src, dst, args)
289             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
290                 collections_copied[src_id] = src_id
291             else:
292                 collections_copied[src_id] = dst_col['uuid']
293         return collections_copied[src_id]
294
295     if isinstance(obj, basestring):
296         # Copy any collections identified in this string to dst, replacing
297         # them with the dst uuids as necessary.
298         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
299         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
300         return obj
301     elif type(obj) == dict:
302         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
303     elif type(obj) == list:
304         return [copy_collections(v, src, dst, args) for v in obj]
305     return obj
306
307 # copy_git_repos(p, src, dst, dst_repo)
308 #
309 #    Copies all git repositories referenced by pipeline instance or
310 #    template 'p' from src to dst.
311 #
312 #    For each component c in the pipeline:
313 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
314 #      * Rename script versions:
315 #          * c['script_version']
316 #          * c['job']['script_version']
317 #          * c['job']['supplied_script_version']
318 #        to the commit hashes they resolve to, since any symbolic
319 #        names (tags, branches) are not preserved in the destination repo.
320 #
321 #    The pipeline object is updated in place with the new repository
322 #    names.  The return value is undefined.
323 #
324 def copy_git_repos(p, src, dst, dst_repo):
325     copied = set()
326     for c in p['components']:
327         component = p['components'][c]
328         if 'repository' in component:
329             repo = component['repository']
330             script_version = component.get('script_version', None)
331             if repo not in copied:
332                 copy_git_repo(repo, src, dst, dst_repo, script_version)
333                 copied.add(repo)
334             component['repository'] = dst_repo
335             if script_version:
336                 repo_dir = local_repo_dir[repo]
337                 component['script_version'] = git_rev_parse(script_version, repo_dir)
338         if 'job' in component:
339             j = component['job']
340             if 'repository' in j:
341                 repo = j['repository']
342                 script_version = j.get('script_version', None)
343                 if repo not in copied:
344                     copy_git_repo(repo, src, dst, dst_repo, script_version)
345                     copied.add(repo)
346                 j['repository'] = dst_repo
347                 repo_dir = local_repo_dir[repo]
348                 if script_version:
349                     j['script_version'] = git_rev_parse(script_version, repo_dir)
350                 if 'supplied_script_version' in j:
351                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
352
353 def total_collection_size(manifest_text):
354     """Return the total number of bytes in this collection (excluding
355     duplicate blocks)."""
356
357     total_bytes = 0
358     locators_seen = {}
359     for line in manifest_text.splitlines():
360         words = line.split()
361         for word in words[1:]:
362             try:
363                 loc = arvados.KeepLocator(word)
364             except ValueError:
365                 continue  # this word isn't a locator, skip it
366             if loc.md5sum not in locators_seen:
367                 locators_seen[loc.md5sum] = True
368                 total_bytes += loc.size
369
370     return total_bytes
371
372 # copy_collection(obj_uuid, src, dst, args)
373 #
374 #    Copies the collection identified by obj_uuid from src to dst.
375 #    Returns the collection object created at dst.
376 #
377 #    If args.progress is True, produce a human-friendly progress
378 #    report.
379 #
380 #    If a collection with the desired portable_data_hash already
381 #    exists at dst, and args.force is False, copy_collection returns
382 #    the existing collection without copying any blocks.  Otherwise
383 #    (if no collection exists or if args.force is True)
384 #    copy_collection copies all of the collection data blocks from src
385 #    to dst.
386 #
387 #    For this application, it is critical to preserve the
388 #    collection's manifest hash, which is not guaranteed with the
389 #    arvados.CollectionReader and arvados.CollectionWriter classes.
390 #    Copying each block in the collection manually, followed by
391 #    the manifest block, ensures that the collection's manifest
392 #    hash will not change.
393 #
394 def copy_collection(obj_uuid, src, dst, args):
395     c = src.collections().get(uuid=obj_uuid).execute()
396
397     # If a collection with this hash already exists at the
398     # destination, and 'force' is not true, just return that
399     # collection.
400     if not args.force:
401         if 'portable_data_hash' in c:
402             colhash = c['portable_data_hash']
403         else:
404             colhash = c['uuid']
405         dstcol = dst.collections().list(
406             filters=[['portable_data_hash', '=', colhash]]
407         ).execute()
408         if dstcol['items_available'] > 0:
409             logger.debug("Skipping collection %s (already at dst)", obj_uuid)
410             return dstcol['items'][0]
411
412     # Fetch the collection's manifest.
413     manifest = c['manifest_text']
414     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
415
416     # Copy each block from src_keep to dst_keep.
417     # Use the newly signed locators returned from dst_keep to build
418     # a new manifest as we go.
419     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
420     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
421     dst_manifest = ""
422     dst_locators = {}
423     bytes_written = 0
424     bytes_expected = total_collection_size(manifest)
425     if args.progress:
426         progress_writer = ProgressWriter(human_progress)
427     else:
428         progress_writer = None
429
430     for line in manifest.splitlines(True):
431         words = line.split()
432         dst_manifest_line = words[0]
433         for word in words[1:]:
434             try:
435                 loc = arvados.KeepLocator(word)
436                 blockhash = loc.md5sum
437                 # copy this block if we haven't seen it before
438                 # (otherwise, just reuse the existing dst_locator)
439                 if blockhash not in dst_locators:
440                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
441                     if progress_writer:
442                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
443                     data = src_keep.get(word)
444                     dst_locator = dst_keep.put(data)
445                     dst_locators[blockhash] = dst_locator
446                     bytes_written += loc.size
447                 dst_manifest_line += ' ' + dst_locators[blockhash]
448             except ValueError:
449                 # If 'word' can't be parsed as a locator,
450                 # presume it's a filename.
451                 dst_manifest_line += ' ' + word
452         dst_manifest += dst_manifest_line
453         if line.endswith("\n"):
454             dst_manifest += "\n"
455
456     if progress_writer:
457         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
458         progress_writer.finish()
459
460     # Copy the manifest and save the collection.
461     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
462     dst_keep.put(dst_manifest)
463
464     if 'uuid' in c:
465         del c['uuid']
466     if 'owner_uuid' in c:
467         del c['owner_uuid']
468     c['manifest_text'] = dst_manifest
469     return dst.collections().create(body=c, ensure_unique_name=True).execute()
470
471 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version)
472 #
473 #    Copies commits from git repository 'src_git_repo' on Arvados
474 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
475 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
476 #    or "jsmith")
477 #
478 #    All commits will be copied to a destination branch named for the
479 #    source repository URL.
480 #
481 #    Because users cannot create their own repositories, the
482 #    destination repository must already exist.
483 #
484 #    The user running this command must be authenticated
485 #    to both repositories.
486 #
487 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version):
488     # Identify the fetch and push URLs for the git repositories.
489     r = src.repositories().list(
490         filters=[['name', '=', src_git_repo]]).execute()
491     if r['items_available'] != 1:
492         raise Exception('cannot identify source repo {}; {} repos found'
493                         .format(src_git_repo, r['items_available']))
494     src_git_url = r['items'][0]['fetch_url']
495     logger.debug('src_git_url: {}'.format(src_git_url))
496
497     r = dst.repositories().list(
498         filters=[['name', '=', dst_git_repo]]).execute()
499     if r['items_available'] != 1:
500         raise Exception('cannot identify destination repo {}; {} repos found'
501                         .format(dst_git_repo, r['items_available']))
502     dst_git_push_url  = r['items'][0]['push_url']
503     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
504
505     # script_version is the "script_version" parameter from the source
506     # component or job.  It is used here to tie the destination branch
507     # to the commit that was used on the source.  If no script_version
508     # was supplied in the component or job, it is a mistake in the pipeline,
509     # but for the purposes of copying the repository, default to "master".
510     #
511     if not script_version:
512         script_version = "master"
513
514     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
515
516     # Copy git commits from src repo to dst repo (but only if
517     # we have not already copied this repo in this session).
518     #
519     if src_git_repo in local_repo_dir:
520         logger.debug('already copied src repo %s, skipping', src_git_repo)
521     else:
522         tmprepo = tempfile.mkdtemp()
523         local_repo_dir[src_git_repo] = tmprepo
524         arvados.util.run_command(
525             ["git", "clone", "--bare", src_git_url, tmprepo],
526             cwd=os.path.dirname(tmprepo))
527         arvados.util.run_command(
528             ["git", "branch", dst_branch, script_version],
529             cwd=tmprepo)
530         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
531         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
532
533
534 def copy_docker_images(pipeline, src, dst, args):
535     """Copy any docker images named in the pipeline components'
536     runtime_constraints field from src to dst."""
537
538     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
539     for c_name, c_info in pipeline['components'].iteritems():
540         if ('runtime_constraints' in c_info and
541             'docker_image' in c_info['runtime_constraints']):
542             copy_docker_image(
543                 c_info['runtime_constraints']['docker_image'],
544                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
545                 src, dst, args)
546
547
548 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
549     """Copy the docker image identified by docker_image and
550     docker_image_tag from src to dst. Create appropriate
551     docker_image_repo+tag and docker_image_hash links at dst.
552
553     """
554
555     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
556
557     # Find the link identifying this docker image.
558     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
559         src, args.retries, docker_image, docker_image_tag)
560     image_uuid, image_info = docker_image_list[0]
561     logger.debug('copying collection {} {}'.format(image_uuid, image_info))
562
563     # Copy the collection it refers to.
564     dst_image_col = copy_collection(image_uuid, src, dst, args)
565
566     # Create docker_image_repo+tag and docker_image_hash links
567     # at the destination.
568     lk = dst.links().create(
569         body={
570             'head_uuid': dst_image_col['uuid'],
571             'link_class': 'docker_image_repo+tag',
572             'name': "{}:{}".format(docker_image, docker_image_tag),
573         }
574     ).execute(num_retries=args.retries)
575     logger.debug('created dst link {}'.format(lk))
576
577     lk = dst.links().create(
578         body={
579             'head_uuid': dst_image_col['uuid'],
580             'link_class': 'docker_image_hash',
581             'name': dst_image_col['portable_data_hash'],
582         }
583     ).execute(num_retries=args.retries)
584     logger.debug('created dst link {}'.format(lk))
585
586
587 # git_rev_parse(rev, repo)
588 #
589 #    Returns the 40-character commit hash corresponding to 'rev' in
590 #    git repository 'repo' (which must be the path of a local git
591 #    repository)
592 #
593 def git_rev_parse(rev, repo):
594     gitout, giterr = arvados.util.run_command(
595         ['git', 'rev-parse', rev], cwd=repo)
596     return gitout.strip()
597
598 # uuid_type(api, object_uuid)
599 #
600 #    Returns the name of the class that object_uuid belongs to, based on
601 #    the second field of the uuid.  This function consults the api's
602 #    schema to identify the object class.
603 #
604 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
605 #
606 #    Special case: if handed a Keep locator hash, return 'Collection'.
607 #
608 def uuid_type(api, object_uuid):
609     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
610         return 'Collection'
611     p = object_uuid.split('-')
612     if len(p) == 3:
613         type_prefix = p[1]
614         for k in api._schema.schemas:
615             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
616             if type_prefix == obj_class:
617                 return k
618     return None
619
620 def abort(msg, code=1):
621     logger.info("arv-copy: %s", msg)
622     exit(code)
623
624
625 # Code for reporting on the progress of a collection upload.
626 # Stolen from arvados.commands.put.ArvPutCollectionWriter
627 # TODO(twp): figure out how to refactor into a shared library
628 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
629 # code)
630
631 def machine_progress(obj_uuid, bytes_written, bytes_expected):
632     return "{} {}: {} {} written {} total\n".format(
633         sys.argv[0],
634         os.getpid(),
635         obj_uuid,
636         bytes_written,
637         -1 if (bytes_expected is None) else bytes_expected)
638
639 def human_progress(obj_uuid, bytes_written, bytes_expected):
640     if bytes_expected:
641         return "\r{}: {}M / {}M {:.1%} ".format(
642             obj_uuid,
643             bytes_written >> 20, bytes_expected >> 20,
644             float(bytes_written) / bytes_expected)
645     else:
646         return "\r{}: {} ".format(obj_uuid, bytes_written)
647
648 class ProgressWriter(object):
649     _progress_func = None
650     outfile = sys.stderr
651
652     def __init__(self, progress_func):
653         self._progress_func = progress_func
654
655     def report(self, obj_uuid, bytes_written, bytes_expected):
656         if self._progress_func is not None:
657             self.outfile.write(
658                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
659
660     def finish(self):
661         self.outfile.write("\n")
662
663 if __name__ == '__main__':
664     main()