16578: Copy tutorials script WIP
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
55 # directories.
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 #
59 local_repo_dir = {}
60
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
64
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
67
68 # The owner_uuid of the object being copied
69 src_owner_uuid = None
70
71 def main():
72     copy_opts = argparse.ArgumentParser(add_help=False)
73
74     copy_opts.add_argument(
75         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76         help='Print version and exit.')
77     copy_opts.add_argument(
78         '-v', '--verbose', dest='verbose', action='store_true',
79         help='Verbose output.')
80     copy_opts.add_argument(
81         '--progress', dest='progress', action='store_true',
82         help='Report progress on copying collections. (default)')
83     copy_opts.add_argument(
84         '--no-progress', dest='progress', action='store_false',
85         help='Do not report progress on copying collections.')
86     copy_opts.add_argument(
87         '-f', '--force', dest='force', action='store_true',
88         help='Perform copy even if the object appears to exist at the remote destination.')
89     copy_opts.add_argument(
90         '--src', dest='source_arvados', required=True,
91         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92     copy_opts.add_argument(
93         '--dst', dest='destination_arvados', required=True,
94         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--recursive', dest='recursive', action='store_true',
97         help='Recursively copy any dependencies for this object. (default)')
98     copy_opts.add_argument(
99         '--no-recursive', dest='recursive', action='store_false',
100         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101     copy_opts.add_argument(
102         '--project-uuid', dest='project_uuid',
103         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
104
105     copy_opts.add_argument(
106         'object_uuid',
107         help='The UUID of the object to be copied.')
108     copy_opts.set_defaults(progress=True)
109     copy_opts.set_defaults(recursive=True)
110
111     parser = argparse.ArgumentParser(
112         description='Copy a workflow or collection from one Arvados instance to another.',
113         parents=[copy_opts, arv_cmd.retry_opt])
114     args = parser.parse_args()
115
116     if args.verbose:
117         logger.setLevel(logging.DEBUG)
118     else:
119         logger.setLevel(logging.INFO)
120
121     # Create API clients for the source and destination instances
122     src_arv = api_for_instance(args.source_arvados)
123     dst_arv = api_for_instance(args.destination_arvados)
124
125     if not args.project_uuid:
126         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
127
128     # Identify the kind of object we have been given, and begin copying.
129     t = uuid_type(src_arv, args.object_uuid)
130     if t == 'Collection':
131         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
132         result = copy_collection(args.object_uuid,
133                                  src_arv, dst_arv,
134                                  args)
135     elif t == 'Workflow':
136         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
137         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
138     else:
139         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
140
141     # Clean up any outstanding temp git repositories.
142     for d in listvalues(local_repo_dir):
143         shutil.rmtree(d, ignore_errors=True)
144
145     # If no exception was thrown and the response does not have an
146     # error_token field, presume success
147     if 'error_token' in result or 'uuid' not in result:
148         logger.error("API server returned an error result: {}".format(result))
149         exit(1)
150
151     logger.info("")
152     logger.info("Success: created copy with uuid {}".format(result['uuid']))
153     exit(0)
154
155 def set_src_owner_uuid(resource, uuid, args):
156     global src_owner_uuid
157     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
158     src_owner_uuid = c.get("owner_uuid")
159
160 # api_for_instance(instance_name)
161 #
162 #     Creates an API client for the Arvados instance identified by
163 #     instance_name.
164 #
165 #     If instance_name contains a slash, it is presumed to be a path
166 #     (either local or absolute) to a file with Arvados configuration
167 #     settings.
168 #
169 #     Otherwise, it is presumed to be the name of a file in
170 #     $HOME/.config/arvados/instance_name.conf
171 #
172 def api_for_instance(instance_name):
173     if '/' in instance_name:
174         config_file = instance_name
175     else:
176         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
177
178     try:
179         cfg = arvados.config.load(config_file)
180     except (IOError, OSError) as e:
181         abort(("Could not open config file {}: {}\n" +
182                "You must make sure that your configuration tokens\n" +
183                "for Arvados instance {} are in {} and that this\n" +
184                "file is readable.").format(
185                    config_file, e, instance_name, config_file))
186
187     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
188         api_is_insecure = (
189             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
190                 ['1', 't', 'true', 'y', 'yes']))
191         client = arvados.api('v1',
192                              host=cfg['ARVADOS_API_HOST'],
193                              token=cfg['ARVADOS_API_TOKEN'],
194                              insecure=api_is_insecure,
195                              model=OrderedJsonModel())
196     else:
197         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
198     return client
199
200 # Check if git is available
201 def check_git_availability():
202     try:
203         arvados.util.run_command(['git', '--help'])
204     except Exception:
205         abort('git command is not available. Please ensure git is installed.')
206
207
208 def filter_iter(arg):
209     """Iterate a filter string-or-list.
210
211     Pass in a filter field that can either be a string or list.
212     This will iterate elements as if the field had been written as a list.
213     """
214     if isinstance(arg, basestring):
215         return iter((arg,))
216     else:
217         return iter(arg)
218
219 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
220     """Update a single repository filter in-place for the destination.
221
222     If the filter checks that the repository is src_repository, it is
223     updated to check that the repository is dst_repository.  If it does
224     anything else, this function raises ValueError.
225     """
226     if src_repository is None:
227         raise ValueError("component does not specify a source repository")
228     elif dst_repository is None:
229         raise ValueError("no destination repository specified to update repository filter")
230     elif repo_filter[1:] == ['=', src_repository]:
231         repo_filter[2] = dst_repository
232     elif repo_filter[1:] == ['in', [src_repository]]:
233         repo_filter[2] = [dst_repository]
234     else:
235         raise ValueError("repository filter is not a simple source match")
236
237 def migrate_script_version_filter(version_filter):
238     """Update a single script_version filter in-place for the destination.
239
240     Currently this function checks that all the filter operands are Git
241     commit hashes.  If they're not, it raises ValueError to indicate that
242     the filter is not portable.  It could be extended to make other
243     transformations in the future.
244     """
245     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
246         raise ValueError("script_version filter is not limited to commit hashes")
247
248 def attr_filtered(filter_, *attr_names):
249     """Return True if filter_ applies to any of attr_names, else False."""
250     return any((name == 'any') or (name in attr_names)
251                for name in filter_iter(filter_[0]))
252
253 @contextlib.contextmanager
254 def exception_handler(handler, *exc_types):
255     """If any exc_types are raised in the block, call handler on the exception."""
256     try:
257         yield
258     except exc_types as error:
259         handler(error)
260
261
262 # copy_workflow(wf_uuid, src, dst, args)
263 #
264 #    Copies a workflow identified by wf_uuid from src to dst.
265 #
266 #    If args.recursive is True, also copy any collections
267 #      referenced in the workflow definition yaml.
268 #
269 #    The owner_uuid of the new workflow is set to any given
270 #      project_uuid or the user who copied the template.
271 #
272 #    Returns the copied workflow object.
273 #
274 def copy_workflow(wf_uuid, src, dst, args):
275     # fetch the workflow from the source instance
276     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
277
278     # copy collections and docker images
279     if args.recursive:
280         wf_def = yaml.safe_load(wf["definition"])
281         if wf_def is not None:
282             locations = []
283             docker_images = {}
284             graph = wf_def.get('$graph', None)
285             if graph is not None:
286                 workflow_collections(graph, locations, docker_images)
287             else:
288                 workflow_collections(wf_def, locations, docker_images)
289
290             if locations:
291                 copy_collections(locations, src, dst, args)
292
293             for image in docker_images:
294                 copy_docker_image(image, docker_images[image], src, dst, args)
295
296     # copy the workflow itself
297     del wf['uuid']
298     wf['owner_uuid'] = args.project_uuid
299     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
300
301 def workflow_collections(obj, locations, docker_images):
302     if isinstance(obj, dict):
303         loc = obj.get('location', None)
304         if loc is not None:
305             if loc.startswith("keep:"):
306                 locations.append(loc[5:])
307
308         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
309         if docker_image is not None:
310             ds = docker_image.split(":", 1)
311             tag = ds[1] if len(ds)==2 else 'latest'
312             docker_images[ds[0]] = tag
313
314         for x in obj:
315             workflow_collections(obj[x], locations, docker_images)
316     elif isinstance(obj, list):
317         for x in obj:
318             workflow_collections(x, locations, docker_images)
319
320 # copy_collections(obj, src, dst, args)
321 #
322 #    Recursively copies all collections referenced by 'obj' from src
323 #    to dst.  obj may be a dict or a list, in which case we run
324 #    copy_collections on every value it contains. If it is a string,
325 #    search it for any substring that matches a collection hash or uuid
326 #    (this will find hidden references to collections like
327 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
328 #
329 #    Returns a copy of obj with any old collection uuids replaced by
330 #    the new ones.
331 #
332 def copy_collections(obj, src, dst, args):
333
334     def copy_collection_fn(collection_match):
335         """Helper function for regex substitution: copies a single collection,
336         identified by the collection_match MatchObject, to the
337         destination.  Returns the destination collection uuid (or the
338         portable data hash if that's what src_id is).
339
340         """
341         src_id = collection_match.group(0)
342         if src_id not in collections_copied:
343             dst_col = copy_collection(src_id, src, dst, args)
344             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
345                 collections_copied[src_id] = src_id
346             else:
347                 collections_copied[src_id] = dst_col['uuid']
348         return collections_copied[src_id]
349
350     if isinstance(obj, basestring):
351         # Copy any collections identified in this string to dst, replacing
352         # them with the dst uuids as necessary.
353         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
354         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
355         return obj
356     elif isinstance(obj, dict):
357         return type(obj)((v, copy_collections(obj[v], src, dst, args))
358                          for v in obj)
359     elif isinstance(obj, list):
360         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
361     return obj
362
363
364 def total_collection_size(manifest_text):
365     """Return the total number of bytes in this collection (excluding
366     duplicate blocks)."""
367
368     total_bytes = 0
369     locators_seen = {}
370     for line in manifest_text.splitlines():
371         words = line.split()
372         for word in words[1:]:
373             try:
374                 loc = arvados.KeepLocator(word)
375             except ValueError:
376                 continue  # this word isn't a locator, skip it
377             if loc.md5sum not in locators_seen:
378                 locators_seen[loc.md5sum] = True
379                 total_bytes += loc.size
380
381     return total_bytes
382
383 def create_collection_from(c, src, dst, args):
384     """Create a new collection record on dst, and copy Docker metadata if
385     available."""
386
387     collection_uuid = c['uuid']
388     body = {}
389     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
390         body[d] = c[d]
391
392     if not body["name"]:
393         body['name'] = "copied from " + collection_uuid
394
395     body['owner_uuid'] = args.project_uuid
396
397     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
398
399     # Create docker_image_repo+tag and docker_image_hash links
400     # at the destination.
401     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
402         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
403
404         for src_link in docker_links:
405             body = {key: src_link[key]
406                     for key in ['link_class', 'name', 'properties']}
407             body['head_uuid'] = dst_collection['uuid']
408             body['owner_uuid'] = args.project_uuid
409
410             lk = dst.links().create(body=body).execute(num_retries=args.retries)
411             logger.debug('created dst link {}'.format(lk))
412
413     return dst_collection
414
415 # copy_collection(obj_uuid, src, dst, args)
416 #
417 #    Copies the collection identified by obj_uuid from src to dst.
418 #    Returns the collection object created at dst.
419 #
420 #    If args.progress is True, produce a human-friendly progress
421 #    report.
422 #
423 #    If a collection with the desired portable_data_hash already
424 #    exists at dst, and args.force is False, copy_collection returns
425 #    the existing collection without copying any blocks.  Otherwise
426 #    (if no collection exists or if args.force is True)
427 #    copy_collection copies all of the collection data blocks from src
428 #    to dst.
429 #
430 #    For this application, it is critical to preserve the
431 #    collection's manifest hash, which is not guaranteed with the
432 #    arvados.CollectionReader and arvados.CollectionWriter classes.
433 #    Copying each block in the collection manually, followed by
434 #    the manifest block, ensures that the collection's manifest
435 #    hash will not change.
436 #
437 def copy_collection(obj_uuid, src, dst, args):
438     if arvados.util.keep_locator_pattern.match(obj_uuid):
439         # If the obj_uuid is a portable data hash, it might not be
440         # uniquely identified with a particular collection.  As a
441         # result, it is ambiguous as to what name to use for the copy.
442         # Apply some heuristics to pick which collection to get the
443         # name from.
444         srccol = src.collections().list(
445             filters=[['portable_data_hash', '=', obj_uuid]],
446             order="created_at asc"
447             ).execute(num_retries=args.retries)
448
449         items = srccol.get("items")
450
451         if not items:
452             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
453             return
454
455         c = None
456
457         if len(items) == 1:
458             # There's only one collection with the PDH, so use that.
459             c = items[0]
460         if not c:
461             # See if there is a collection that's in the same project
462             # as the root item (usually a workflow) being copied.
463             for i in items:
464                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
465                     c = i
466                     break
467         if not c:
468             # Didn't find any collections located in the same project, so
469             # pick the oldest collection that has a name assigned to it.
470             for i in items:
471                 if i.get("name"):
472                     c = i
473                     break
474         if not c:
475             # None of the collections have names (?!), so just pick the
476             # first one.
477             c = items[0]
478
479         # list() doesn't return manifest text (and we don't want it to,
480         # because we don't need the same maninfest text sent to us 50
481         # times) so go and retrieve the collection object directly
482         # which will include the manifest text.
483         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
484     else:
485         # Assume this is an actual collection uuid, so fetch it directly.
486         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
487
488     # If a collection with this hash already exists at the
489     # destination, and 'force' is not true, just return that
490     # collection.
491     if not args.force:
492         if 'portable_data_hash' in c:
493             colhash = c['portable_data_hash']
494         else:
495             colhash = c['uuid']
496         dstcol = dst.collections().list(
497             filters=[['portable_data_hash', '=', colhash]]
498         ).execute(num_retries=args.retries)
499         if dstcol['items_available'] > 0:
500             for d in dstcol['items']:
501                 if ((args.project_uuid == d['owner_uuid']) and
502                     (c.get('name') == d['name']) and
503                     (c['portable_data_hash'] == d['portable_data_hash'])):
504                     return d
505             c['manifest_text'] = dst.collections().get(
506                 uuid=dstcol['items'][0]['uuid']
507             ).execute(num_retries=args.retries)['manifest_text']
508             return create_collection_from(c, src, dst, args)
509
510     # Fetch the collection's manifest.
511     manifest = c['manifest_text']
512     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
513
514     # Copy each block from src_keep to dst_keep.
515     # Use the newly signed locators returned from dst_keep to build
516     # a new manifest as we go.
517     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
518     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
519     dst_manifest = ""
520     dst_locators = {}
521     bytes_written = 0
522     bytes_expected = total_collection_size(manifest)
523     if args.progress:
524         progress_writer = ProgressWriter(human_progress)
525     else:
526         progress_writer = None
527
528     for line in manifest.splitlines():
529         words = line.split()
530         dst_manifest += words[0]
531         for word in words[1:]:
532             try:
533                 loc = arvados.KeepLocator(word)
534             except ValueError:
535                 # If 'word' can't be parsed as a locator,
536                 # presume it's a filename.
537                 dst_manifest += ' ' + word
538                 continue
539             blockhash = loc.md5sum
540             # copy this block if we haven't seen it before
541             # (otherwise, just reuse the existing dst_locator)
542             if blockhash not in dst_locators:
543                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
544                 if progress_writer:
545                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
546                 data = src_keep.get(word)
547                 dst_locator = dst_keep.put(data)
548                 dst_locators[blockhash] = dst_locator
549                 bytes_written += loc.size
550             dst_manifest += ' ' + dst_locators[blockhash]
551         dst_manifest += "\n"
552
553     if progress_writer:
554         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
555         progress_writer.finish()
556
557     # Copy the manifest and save the collection.
558     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
559
560     c['manifest_text'] = dst_manifest
561     return create_collection_from(c, src, dst, args)
562
563 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
564     r = api.repositories().list(
565         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
566     if r['items_available'] != 1:
567         raise Exception('cannot identify repo {}; {} repos found'
568                         .format(repo_name, r['items_available']))
569
570     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
571     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
572     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
573
574     priority = https_url + other_url + http_url
575
576     git_config = []
577     git_url = None
578     for url in priority:
579         if url.startswith("http"):
580             u = urllib.parse.urlsplit(url)
581             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
582             git_config = ["-c", "credential.%s/.username=none" % baseurl,
583                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
584         else:
585             git_config = []
586
587         try:
588             logger.debug("trying %s", url)
589             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
590                                       env={"HOME": os.environ["HOME"],
591                                            "ARVADOS_API_TOKEN": api.api_token,
592                                            "GIT_ASKPASS": "/bin/false"})
593         except arvados.errors.CommandFailedError:
594             pass
595         else:
596             git_url = url
597             break
598
599     if not git_url:
600         raise Exception('Cannot access git repository, tried {}'
601                         .format(priority))
602
603     if git_url.startswith("http:"):
604         if allow_insecure_http:
605             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
606         else:
607             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
608
609     return (git_url, git_config)
610
611
612 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
613     """Copy the docker image identified by docker_image and
614     docker_image_tag from src to dst. Create appropriate
615     docker_image_repo+tag and docker_image_hash links at dst.
616
617     """
618
619     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
620
621     # Find the link identifying this docker image.
622     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
623         src, args.retries, docker_image, docker_image_tag)
624     if docker_image_list:
625         image_uuid, image_info = docker_image_list[0]
626         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
627
628         # Copy the collection it refers to.
629         dst_image_col = copy_collection(image_uuid, src, dst, args)
630     elif arvados.util.keep_locator_pattern.match(docker_image):
631         dst_image_col = copy_collection(docker_image, src, dst, args)
632     else:
633         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
634
635 # git_rev_parse(rev, repo)
636 #
637 #    Returns the 40-character commit hash corresponding to 'rev' in
638 #    git repository 'repo' (which must be the path of a local git
639 #    repository)
640 #
641 def git_rev_parse(rev, repo):
642     gitout, giterr = arvados.util.run_command(
643         ['git', 'rev-parse', rev], cwd=repo)
644     return gitout.strip()
645
646 # uuid_type(api, object_uuid)
647 #
648 #    Returns the name of the class that object_uuid belongs to, based on
649 #    the second field of the uuid.  This function consults the api's
650 #    schema to identify the object class.
651 #
652 #    It returns a string such as 'Collection', 'Workflow', etc.
653 #
654 #    Special case: if handed a Keep locator hash, return 'Collection'.
655 #
656 def uuid_type(api, object_uuid):
657     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
658         return 'Collection'
659     p = object_uuid.split('-')
660     if len(p) == 3:
661         type_prefix = p[1]
662         for k in api._schema.schemas:
663             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
664             if type_prefix == obj_class:
665                 return k
666     return None
667
668 def abort(msg, code=1):
669     logger.info("arv-copy: %s", msg)
670     exit(code)
671
672
673 # Code for reporting on the progress of a collection upload.
674 # Stolen from arvados.commands.put.ArvPutCollectionWriter
675 # TODO(twp): figure out how to refactor into a shared library
676 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
677 # code)
678
679 def machine_progress(obj_uuid, bytes_written, bytes_expected):
680     return "{} {}: {} {} written {} total\n".format(
681         sys.argv[0],
682         os.getpid(),
683         obj_uuid,
684         bytes_written,
685         -1 if (bytes_expected is None) else bytes_expected)
686
687 def human_progress(obj_uuid, bytes_written, bytes_expected):
688     if bytes_expected:
689         return "\r{}: {}M / {}M {:.1%} ".format(
690             obj_uuid,
691             bytes_written >> 20, bytes_expected >> 20,
692             float(bytes_written) / bytes_expected)
693     else:
694         return "\r{}: {} ".format(obj_uuid, bytes_written)
695
696 class ProgressWriter(object):
697     _progress_func = None
698     outfile = sys.stderr
699
700     def __init__(self, progress_func):
701         self._progress_func = progress_func
702
703     def report(self, obj_uuid, bytes_written, bytes_expected):
704         if self._progress_func is not None:
705             self.outfile.write(
706                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
707
708     def finish(self):
709         self.outfile.write("\n")
710
711 if __name__ == '__main__':
712     main()