Merge branch 'origin-5309-keepproxy-panic' closes #5309
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 def main():
50     copy_opts = argparse.ArgumentParser(add_help=False)
51
52     copy_opts.add_argument(
53         '-v', '--verbose', dest='verbose', action='store_true',
54         help='Verbose output.')
55     copy_opts.add_argument(
56         '--progress', dest='progress', action='store_true',
57         help='Report progress on copying collections. (default)')
58     copy_opts.add_argument(
59         '--no-progress', dest='progress', action='store_false',
60         help='Do not report progress on copying collections.')
61     copy_opts.add_argument(
62         '-f', '--force', dest='force', action='store_true',
63         help='Perform copy even if the object appears to exist at the remote destination.')
64     copy_opts.add_argument(
65         '--src', dest='source_arvados', required=True,
66         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
67     copy_opts.add_argument(
68         '--dst', dest='destination_arvados', required=True,
69         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
70     copy_opts.add_argument(
71         '--recursive', dest='recursive', action='store_true',
72         help='Recursively copy any dependencies for this object. (default)')
73     copy_opts.add_argument(
74         '--no-recursive', dest='recursive', action='store_false',
75         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
76     copy_opts.add_argument(
77         '--dst-git-repo', dest='dst_git_repo',
78         help='The name of the destination git repository. Required when copying a pipeline recursively.')
79     copy_opts.add_argument(
80         '--project-uuid', dest='project_uuid',
81         help='The UUID of the project at the destination to which the pipeline should be copied.')
82     copy_opts.add_argument(
83         'object_uuid',
84         help='The UUID of the object to be copied.')
85     copy_opts.set_defaults(progress=True)
86     copy_opts.set_defaults(recursive=True)
87
88     parser = argparse.ArgumentParser(
89         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
90         parents=[copy_opts, arv_cmd.retry_opt])
91     args = parser.parse_args()
92
93     if args.verbose:
94         logger.setLevel(logging.DEBUG)
95     else:
96         logger.setLevel(logging.INFO)
97
98     # Create API clients for the source and destination instances
99     src_arv = api_for_instance(args.source_arvados)
100     dst_arv = api_for_instance(args.destination_arvados)
101
102     if not args.project_uuid:
103         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
104
105     # Identify the kind of object we have been given, and begin copying.
106     t = uuid_type(src_arv, args.object_uuid)
107     if t == 'Collection':
108         result = copy_collection(args.object_uuid,
109                                  src_arv, dst_arv,
110                                  args)
111     elif t == 'PipelineInstance':
112         result = copy_pipeline_instance(args.object_uuid,
113                                         src_arv, dst_arv,
114                                         args)
115     elif t == 'PipelineTemplate':
116         result = copy_pipeline_template(args.object_uuid,
117                                         src_arv, dst_arv, args)
118     else:
119         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
120
121     # Clean up any outstanding temp git repositories.
122     for d in local_repo_dir.values():
123         shutil.rmtree(d, ignore_errors=True)
124
125     # If no exception was thrown and the response does not have an
126     # error_token field, presume success
127     if 'error_token' in result or 'uuid' not in result:
128         logger.error("API server returned an error result: {}".format(result))
129         exit(1)
130
131     logger.info("")
132     logger.info("Success: created copy with uuid {}".format(result['uuid']))
133     exit(0)
134
135 # api_for_instance(instance_name)
136 #
137 #     Creates an API client for the Arvados instance identified by
138 #     instance_name.
139 #
140 #     If instance_name contains a slash, it is presumed to be a path
141 #     (either local or absolute) to a file with Arvados configuration
142 #     settings.
143 #
144 #     Otherwise, it is presumed to be the name of a file in
145 #     $HOME/.config/arvados/instance_name.conf
146 #
147 def api_for_instance(instance_name):
148     if '/' in instance_name:
149         config_file = instance_name
150     else:
151         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
152
153     try:
154         cfg = arvados.config.load(config_file)
155     except (IOError, OSError) as e:
156         abort(("Could not open config file {}: {}\n" +
157                "You must make sure that your configuration tokens\n" +
158                "for Arvados instance {} are in {} and that this\n" +
159                "file is readable.").format(
160                    config_file, e, instance_name, config_file))
161
162     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
163         api_is_insecure = (
164             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
165                 ['1', 't', 'true', 'y', 'yes']))
166         client = arvados.api('v1',
167                              host=cfg['ARVADOS_API_HOST'],
168                              token=cfg['ARVADOS_API_TOKEN'],
169                              insecure=api_is_insecure)
170     else:
171         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
172     return client
173
174 # copy_pipeline_instance(pi_uuid, src, dst, args)
175 #
176 #    Copies a pipeline instance identified by pi_uuid from src to dst.
177 #
178 #    If the args.recursive option is set:
179 #      1. Copies all input collections
180 #           * For each component in the pipeline, include all collections
181 #             listed as job dependencies for that component)
182 #      2. Copy docker images
183 #      3. Copy git repositories
184 #      4. Copy the pipeline template
185 #
186 #    The only changes made to the copied pipeline instance are:
187 #      1. The original pipeline instance UUID is preserved in
188 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
189 #      2. The pipeline_template_uuid is changed to the new template uuid.
190 #      3. The owner_uuid of the instance is changed to the user who
191 #         copied it.
192 #
193 def copy_pipeline_instance(pi_uuid, src, dst, args):
194     # Fetch the pipeline instance record.
195     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
196
197     if args.recursive:
198         if not args.dst_git_repo:
199             abort('--dst-git-repo is required when copying a pipeline recursively.')
200         # Copy the pipeline template and save the copied template.
201         if pi.get('pipeline_template_uuid', None):
202             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
203                                         src, dst, args)
204
205         # Copy input collections, docker images and git repos.
206         pi = copy_collections(pi, src, dst, args)
207         copy_git_repos(pi, src, dst, args.dst_git_repo)
208         copy_docker_images(pi, src, dst, args)
209
210         # Update the fields of the pipeline instance with the copied
211         # pipeline template.
212         if pi.get('pipeline_template_uuid', None):
213             pi['pipeline_template_uuid'] = pt['uuid']
214
215     else:
216         # not recursive
217         logger.info("Copying only pipeline instance %s.", pi_uuid)
218         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
219
220     # Update the pipeline instance properties, and create the new
221     # instance at dst.
222     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
223     pi['description'] = "Pipeline copied from {}\n\n{}".format(
224         pi_uuid,
225         pi['description'] if pi.get('description', None) else '')
226
227     pi['owner_uuid'] = args.project_uuid
228
229     del pi['uuid']
230
231     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute()
232     return new_pi
233
234 # copy_pipeline_template(pt_uuid, src, dst, args)
235 #
236 #    Copies a pipeline template identified by pt_uuid from src to dst.
237 #
238 #    If args.recursive is True, also copy any collections, docker
239 #    images and git repositories that this template references.
240 #
241 #    The owner_uuid of the new template is changed to that of the user
242 #    who copied the template.
243 #
244 #    Returns the copied pipeline template object.
245 #
246 def copy_pipeline_template(pt_uuid, src, dst, args):
247     # fetch the pipeline template from the source instance
248     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
249
250     if args.recursive:
251         if not args.dst_git_repo:
252             abort('--dst-git-repo is required when copying a pipeline recursively.')
253         # Copy input collections, docker images and git repos.
254         pt = copy_collections(pt, src, dst, args)
255         copy_git_repos(pt, src, dst, args.dst_git_repo)
256         copy_docker_images(pt, src, dst, args)
257
258     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
259         pt_uuid,
260         pt['description'] if pt.get('description', None) else '')
261     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
262     del pt['uuid']
263
264     pt['owner_uuid'] = args.project_uuid
265
266     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute()
267
268 # copy_collections(obj, src, dst, args)
269 #
270 #    Recursively copies all collections referenced by 'obj' from src
271 #    to dst.  obj may be a dict or a list, in which case we run
272 #    copy_collections on every value it contains. If it is a string,
273 #    search it for any substring that matches a collection hash or uuid
274 #    (this will find hidden references to collections like
275 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
276 #
277 #    Returns a copy of obj with any old collection uuids replaced by
278 #    the new ones.
279 #
280 def copy_collections(obj, src, dst, args):
281
282     def copy_collection_fn(collection_match):
283         """Helper function for regex substitution: copies a single collection,
284         identified by the collection_match MatchObject, to the
285         destination.  Returns the destination collection uuid (or the
286         portable data hash if that's what src_id is).
287
288         """
289         src_id = collection_match.group(0)
290         if src_id not in collections_copied:
291             dst_col = copy_collection(src_id, src, dst, args)
292             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
293                 collections_copied[src_id] = src_id
294             else:
295                 collections_copied[src_id] = dst_col['uuid']
296         return collections_copied[src_id]
297
298     if isinstance(obj, basestring):
299         # Copy any collections identified in this string to dst, replacing
300         # them with the dst uuids as necessary.
301         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
302         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
303         return obj
304     elif type(obj) == dict:
305         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
306     elif type(obj) == list:
307         return [copy_collections(v, src, dst, args) for v in obj]
308     return obj
309
310 # copy_git_repos(p, src, dst, dst_repo)
311 #
312 #    Copies all git repositories referenced by pipeline instance or
313 #    template 'p' from src to dst.
314 #
315 #    For each component c in the pipeline:
316 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
317 #      * Rename script versions:
318 #          * c['script_version']
319 #          * c['job']['script_version']
320 #          * c['job']['supplied_script_version']
321 #        to the commit hashes they resolve to, since any symbolic
322 #        names (tags, branches) are not preserved in the destination repo.
323 #
324 #    The pipeline object is updated in place with the new repository
325 #    names.  The return value is undefined.
326 #
327 def copy_git_repos(p, src, dst, dst_repo):
328     copied = set()
329     for c in p['components']:
330         component = p['components'][c]
331         if 'repository' in component:
332             repo = component['repository']
333             script_version = component.get('script_version', None)
334             if repo not in copied:
335                 copy_git_repo(repo, src, dst, dst_repo, script_version)
336                 copied.add(repo)
337             component['repository'] = dst_repo
338             if script_version:
339                 repo_dir = local_repo_dir[repo]
340                 component['script_version'] = git_rev_parse(script_version, repo_dir)
341         if 'job' in component:
342             j = component['job']
343             if 'repository' in j:
344                 repo = j['repository']
345                 script_version = j.get('script_version', None)
346                 if repo not in copied:
347                     copy_git_repo(repo, src, dst, dst_repo, script_version)
348                     copied.add(repo)
349                 j['repository'] = dst_repo
350                 repo_dir = local_repo_dir[repo]
351                 if script_version:
352                     j['script_version'] = git_rev_parse(script_version, repo_dir)
353                 if 'supplied_script_version' in j:
354                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
355
356 def total_collection_size(manifest_text):
357     """Return the total number of bytes in this collection (excluding
358     duplicate blocks)."""
359
360     total_bytes = 0
361     locators_seen = {}
362     for line in manifest_text.splitlines():
363         words = line.split()
364         for word in words[1:]:
365             try:
366                 loc = arvados.KeepLocator(word)
367             except ValueError:
368                 continue  # this word isn't a locator, skip it
369             if loc.md5sum not in locators_seen:
370                 locators_seen[loc.md5sum] = True
371                 total_bytes += loc.size
372
373     return total_bytes
374
375 def create_collection_from(c, src, dst, args):
376     """Create a new collection record on dst, and copy Docker metadata if
377     available."""
378
379     collection_uuid = c['uuid']
380
381     del c['uuid']
382
383     if 'properties' in c:
384         del c['properties']
385
386     c['owner_uuid'] = args.project_uuid
387
388     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
389
390     # Create docker_image_repo+tag and docker_image_hash links
391     # at the destination.
392     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
393         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
394
395         for d in docker_links:
396             body={
397                 'head_uuid': dst_collection['uuid'],
398                 'link_class': link_class,
399                 'name': d['name'],
400             }
401             body['owner_uuid'] = args.project_uuid
402
403             lk = dst.links().create(body=body).execute(num_retries=args.retries)
404             logger.debug('created dst link {}'.format(lk))
405
406     return dst_collection
407
408 # copy_collection(obj_uuid, src, dst, args)
409 #
410 #    Copies the collection identified by obj_uuid from src to dst.
411 #    Returns the collection object created at dst.
412 #
413 #    If args.progress is True, produce a human-friendly progress
414 #    report.
415 #
416 #    If a collection with the desired portable_data_hash already
417 #    exists at dst, and args.force is False, copy_collection returns
418 #    the existing collection without copying any blocks.  Otherwise
419 #    (if no collection exists or if args.force is True)
420 #    copy_collection copies all of the collection data blocks from src
421 #    to dst.
422 #
423 #    For this application, it is critical to preserve the
424 #    collection's manifest hash, which is not guaranteed with the
425 #    arvados.CollectionReader and arvados.CollectionWriter classes.
426 #    Copying each block in the collection manually, followed by
427 #    the manifest block, ensures that the collection's manifest
428 #    hash will not change.
429 #
430 def copy_collection(obj_uuid, src, dst, args):
431     c = src.collections().get(uuid=obj_uuid).execute()
432
433     # If a collection with this hash already exists at the
434     # destination, and 'force' is not true, just return that
435     # collection.
436     if not args.force:
437         if 'portable_data_hash' in c:
438             colhash = c['portable_data_hash']
439         else:
440             colhash = c['uuid']
441         dstcol = dst.collections().list(
442             filters=[['portable_data_hash', '=', colhash]]
443         ).execute()
444         if dstcol['items_available'] > 0:
445             for d in dstcol['items']:
446                 if ((args.project_uuid == d['owner_uuid']) and
447                     (c.get('name') == d['name']) and
448                     (c['portable_data_hash'] == d['portable_data_hash'])):
449                     return d
450
451             return create_collection_from(c, src, dst, args)
452
453     # Fetch the collection's manifest.
454     manifest = c['manifest_text']
455     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
456
457     # Copy each block from src_keep to dst_keep.
458     # Use the newly signed locators returned from dst_keep to build
459     # a new manifest as we go.
460     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
461     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
462     dst_manifest = ""
463     dst_locators = {}
464     bytes_written = 0
465     bytes_expected = total_collection_size(manifest)
466     if args.progress:
467         progress_writer = ProgressWriter(human_progress)
468     else:
469         progress_writer = None
470
471     for line in manifest.splitlines(True):
472         words = line.split()
473         dst_manifest_line = words[0]
474         for word in words[1:]:
475             try:
476                 loc = arvados.KeepLocator(word)
477                 blockhash = loc.md5sum
478                 # copy this block if we haven't seen it before
479                 # (otherwise, just reuse the existing dst_locator)
480                 if blockhash not in dst_locators:
481                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
482                     if progress_writer:
483                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
484                     data = src_keep.get(word)
485                     dst_locator = dst_keep.put(data)
486                     dst_locators[blockhash] = dst_locator
487                     bytes_written += loc.size
488                 dst_manifest_line += ' ' + dst_locators[blockhash]
489             except ValueError:
490                 # If 'word' can't be parsed as a locator,
491                 # presume it's a filename.
492                 dst_manifest_line += ' ' + word
493         dst_manifest += dst_manifest_line
494         if line.endswith("\n"):
495             dst_manifest += "\n"
496
497     if progress_writer:
498         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
499         progress_writer.finish()
500
501     # Copy the manifest and save the collection.
502     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
503
504     dst_keep.put(dst_manifest.encode('utf-8'))
505     c['manifest_text'] = dst_manifest
506     return create_collection_from(c, src, dst, args)
507
508 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version)
509 #
510 #    Copies commits from git repository 'src_git_repo' on Arvados
511 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
512 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
513 #    or "jsmith")
514 #
515 #    All commits will be copied to a destination branch named for the
516 #    source repository URL.
517 #
518 #    Because users cannot create their own repositories, the
519 #    destination repository must already exist.
520 #
521 #    The user running this command must be authenticated
522 #    to both repositories.
523 #
524 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version):
525     # Identify the fetch and push URLs for the git repositories.
526     r = src.repositories().list(
527         filters=[['name', '=', src_git_repo]]).execute()
528     if r['items_available'] != 1:
529         raise Exception('cannot identify source repo {}; {} repos found'
530                         .format(src_git_repo, r['items_available']))
531     src_git_url = r['items'][0]['fetch_url']
532     logger.debug('src_git_url: {}'.format(src_git_url))
533
534     r = dst.repositories().list(
535         filters=[['name', '=', dst_git_repo]]).execute()
536     if r['items_available'] != 1:
537         raise Exception('cannot identify destination repo {}; {} repos found'
538                         .format(dst_git_repo, r['items_available']))
539     dst_git_push_url  = r['items'][0]['push_url']
540     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
541
542     # script_version is the "script_version" parameter from the source
543     # component or job.  It is used here to tie the destination branch
544     # to the commit that was used on the source.  If no script_version
545     # was supplied in the component or job, it is a mistake in the pipeline,
546     # but for the purposes of copying the repository, default to "master".
547     #
548     if not script_version:
549         script_version = "master"
550
551     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
552
553     # Copy git commits from src repo to dst repo (but only if
554     # we have not already copied this repo in this session).
555     #
556     if src_git_repo in local_repo_dir:
557         logger.debug('already copied src repo %s, skipping', src_git_repo)
558     else:
559         tmprepo = tempfile.mkdtemp()
560         local_repo_dir[src_git_repo] = tmprepo
561         arvados.util.run_command(
562             ["git", "clone", "--bare", src_git_url, tmprepo],
563             cwd=os.path.dirname(tmprepo))
564         arvados.util.run_command(
565             ["git", "branch", dst_branch, script_version],
566             cwd=tmprepo)
567         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
568         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
569
570
571 def copy_docker_images(pipeline, src, dst, args):
572     """Copy any docker images named in the pipeline components'
573     runtime_constraints field from src to dst."""
574
575     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
576     for c_name, c_info in pipeline['components'].iteritems():
577         if ('runtime_constraints' in c_info and
578             'docker_image' in c_info['runtime_constraints']):
579             copy_docker_image(
580                 c_info['runtime_constraints']['docker_image'],
581                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
582                 src, dst, args)
583
584
585 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
586     """Copy the docker image identified by docker_image and
587     docker_image_tag from src to dst. Create appropriate
588     docker_image_repo+tag and docker_image_hash links at dst.
589
590     """
591
592     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
593
594     # Find the link identifying this docker image.
595     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
596         src, args.retries, docker_image, docker_image_tag)
597     image_uuid, image_info = docker_image_list[0]
598     logger.debug('copying collection {} {}'.format(image_uuid, image_info))
599
600     # Copy the collection it refers to.
601     dst_image_col = copy_collection(image_uuid, src, dst, args)
602
603
604 # git_rev_parse(rev, repo)
605 #
606 #    Returns the 40-character commit hash corresponding to 'rev' in
607 #    git repository 'repo' (which must be the path of a local git
608 #    repository)
609 #
610 def git_rev_parse(rev, repo):
611     gitout, giterr = arvados.util.run_command(
612         ['git', 'rev-parse', rev], cwd=repo)
613     return gitout.strip()
614
615 # uuid_type(api, object_uuid)
616 #
617 #    Returns the name of the class that object_uuid belongs to, based on
618 #    the second field of the uuid.  This function consults the api's
619 #    schema to identify the object class.
620 #
621 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
622 #
623 #    Special case: if handed a Keep locator hash, return 'Collection'.
624 #
625 def uuid_type(api, object_uuid):
626     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
627         return 'Collection'
628     p = object_uuid.split('-')
629     if len(p) == 3:
630         type_prefix = p[1]
631         for k in api._schema.schemas:
632             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
633             if type_prefix == obj_class:
634                 return k
635     return None
636
637 def abort(msg, code=1):
638     logger.info("arv-copy: %s", msg)
639     exit(code)
640
641
642 # Code for reporting on the progress of a collection upload.
643 # Stolen from arvados.commands.put.ArvPutCollectionWriter
644 # TODO(twp): figure out how to refactor into a shared library
645 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
646 # code)
647
648 def machine_progress(obj_uuid, bytes_written, bytes_expected):
649     return "{} {}: {} {} written {} total\n".format(
650         sys.argv[0],
651         os.getpid(),
652         obj_uuid,
653         bytes_written,
654         -1 if (bytes_expected is None) else bytes_expected)
655
656 def human_progress(obj_uuid, bytes_written, bytes_expected):
657     if bytes_expected:
658         return "\r{}: {}M / {}M {:.1%} ".format(
659             obj_uuid,
660             bytes_written >> 20, bytes_expected >> 20,
661             float(bytes_written) / bytes_expected)
662     else:
663         return "\r{}: {} ".format(obj_uuid, bytes_written)
664
665 class ProgressWriter(object):
666     _progress_func = None
667     outfile = sys.stderr
668
669     def __init__(self, progress_func):
670         self._progress_func = progress_func
671
672     def report(self, obj_uuid, bytes_written, bytes_expected):
673         if self._progress_func is not None:
674             self.outfile.write(
675                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
676
677     def finish(self):
678         self.outfile.write("\n")
679
680 if __name__ == '__main__':
681     main()