20839: Replace arvados.util.run_command calls in arv-copy
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39
40 import arvados
41 import arvados.config
42 import arvados.keep
43 import arvados.util
44 import arvados.commands._util as arv_cmd
45 import arvados.commands.keepdocker
46 import ruamel.yaml as yaml
47
48 from arvados.api import OrderedJsonModel
49 from arvados._version import __version__
50
51 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
52
53 logger = logging.getLogger('arvados.arv-copy')
54
55 # local_repo_dir records which git repositories from the Arvados source
56 # instance have been checked out locally during this run, and to which
57 # directories.
58 # e.g. if repository 'twp' from src_arv has been cloned into
59 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
60 #
61 local_repo_dir = {}
62
63 # List of collections that have been copied in this session, and their
64 # destination collection UUIDs.
65 collections_copied = {}
66
67 # Set of (repository, script_version) two-tuples of commits copied in git.
68 scripts_copied = set()
69
70 # The owner_uuid of the object being copied
71 src_owner_uuid = None
72
73 def main():
74     copy_opts = argparse.ArgumentParser(add_help=False)
75
76     copy_opts.add_argument(
77         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
78         help='Print version and exit.')
79     copy_opts.add_argument(
80         '-v', '--verbose', dest='verbose', action='store_true',
81         help='Verbose output.')
82     copy_opts.add_argument(
83         '--progress', dest='progress', action='store_true',
84         help='Report progress on copying collections. (default)')
85     copy_opts.add_argument(
86         '--no-progress', dest='progress', action='store_false',
87         help='Do not report progress on copying collections.')
88     copy_opts.add_argument(
89         '-f', '--force', dest='force', action='store_true',
90         help='Perform copy even if the object appears to exist at the remote destination.')
91     copy_opts.add_argument(
92         '--src', dest='source_arvados',
93         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
94     copy_opts.add_argument(
95         '--dst', dest='destination_arvados',
96         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
97     copy_opts.add_argument(
98         '--recursive', dest='recursive', action='store_true',
99         help='Recursively copy any dependencies for this object, and subprojects. (default)')
100     copy_opts.add_argument(
101         '--no-recursive', dest='recursive', action='store_false',
102         help='Do not copy any dependencies or subprojects.')
103     copy_opts.add_argument(
104         '--project-uuid', dest='project_uuid',
105         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
106     copy_opts.add_argument(
107         '--storage-classes', dest='storage_classes',
108         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
109
110     copy_opts.add_argument(
111         'object_uuid',
112         help='The UUID of the object to be copied.')
113     copy_opts.set_defaults(progress=True)
114     copy_opts.set_defaults(recursive=True)
115
116     parser = argparse.ArgumentParser(
117         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
118         parents=[copy_opts, arv_cmd.retry_opt])
119     args = parser.parse_args()
120
121     if args.storage_classes:
122         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
123
124     if args.verbose:
125         logger.setLevel(logging.DEBUG)
126     else:
127         logger.setLevel(logging.INFO)
128
129     if not args.source_arvados:
130         args.source_arvados = args.object_uuid[:5]
131
132     # Create API clients for the source and destination instances
133     src_arv = api_for_instance(args.source_arvados, args.retries)
134     dst_arv = api_for_instance(args.destination_arvados, args.retries)
135
136     if not args.project_uuid:
137         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
138
139     # Identify the kind of object we have been given, and begin copying.
140     t = uuid_type(src_arv, args.object_uuid)
141     if t == 'Collection':
142         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
143         result = copy_collection(args.object_uuid,
144                                  src_arv, dst_arv,
145                                  args)
146     elif t == 'Workflow':
147         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
148         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
149     elif t == 'Group':
150         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
151         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
152     else:
153         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
154
155     # Clean up any outstanding temp git repositories.
156     for d in listvalues(local_repo_dir):
157         shutil.rmtree(d, ignore_errors=True)
158
159     # If no exception was thrown and the response does not have an
160     # error_token field, presume success
161     if 'error_token' in result or 'uuid' not in result:
162         logger.error("API server returned an error result: {}".format(result))
163         exit(1)
164
165     print(result['uuid'])
166
167     if result.get('partial_error'):
168         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
169         exit(1)
170
171     logger.info("Success: created copy with uuid {}".format(result['uuid']))
172     exit(0)
173
174 def set_src_owner_uuid(resource, uuid, args):
175     global src_owner_uuid
176     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
177     src_owner_uuid = c.get("owner_uuid")
178
179 # api_for_instance(instance_name)
180 #
181 #     Creates an API client for the Arvados instance identified by
182 #     instance_name.
183 #
184 #     If instance_name contains a slash, it is presumed to be a path
185 #     (either local or absolute) to a file with Arvados configuration
186 #     settings.
187 #
188 #     Otherwise, it is presumed to be the name of a file in
189 #     $HOME/.config/arvados/instance_name.conf
190 #
191 def api_for_instance(instance_name, num_retries):
192     if not instance_name:
193         # Use environment
194         return arvados.api('v1', model=OrderedJsonModel())
195
196     if '/' in instance_name:
197         config_file = instance_name
198     else:
199         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
200
201     try:
202         cfg = arvados.config.load(config_file)
203     except (IOError, OSError) as e:
204         abort(("Could not open config file {}: {}\n" +
205                "You must make sure that your configuration tokens\n" +
206                "for Arvados instance {} are in {} and that this\n" +
207                "file is readable.").format(
208                    config_file, e, instance_name, config_file))
209
210     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
211         api_is_insecure = (
212             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
213                 ['1', 't', 'true', 'y', 'yes']))
214         client = arvados.api('v1',
215                              host=cfg['ARVADOS_API_HOST'],
216                              token=cfg['ARVADOS_API_TOKEN'],
217                              insecure=api_is_insecure,
218                              model=OrderedJsonModel(),
219                              num_retries=num_retries,
220                              )
221     else:
222         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
223     return client
224
225 # Check if git is available
226 def check_git_availability():
227     try:
228         subprocess.run(
229             ['git', '--version'],
230             check=True,
231             stdout=subprocess.DEVNULL,
232         )
233     except FileNotFoundError:
234         abort('git command is not available. Please ensure git is installed.')
235
236
237 def filter_iter(arg):
238     """Iterate a filter string-or-list.
239
240     Pass in a filter field that can either be a string or list.
241     This will iterate elements as if the field had been written as a list.
242     """
243     if isinstance(arg, basestring):
244         return iter((arg,))
245     else:
246         return iter(arg)
247
248 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
249     """Update a single repository filter in-place for the destination.
250
251     If the filter checks that the repository is src_repository, it is
252     updated to check that the repository is dst_repository.  If it does
253     anything else, this function raises ValueError.
254     """
255     if src_repository is None:
256         raise ValueError("component does not specify a source repository")
257     elif dst_repository is None:
258         raise ValueError("no destination repository specified to update repository filter")
259     elif repo_filter[1:] == ['=', src_repository]:
260         repo_filter[2] = dst_repository
261     elif repo_filter[1:] == ['in', [src_repository]]:
262         repo_filter[2] = [dst_repository]
263     else:
264         raise ValueError("repository filter is not a simple source match")
265
266 def migrate_script_version_filter(version_filter):
267     """Update a single script_version filter in-place for the destination.
268
269     Currently this function checks that all the filter operands are Git
270     commit hashes.  If they're not, it raises ValueError to indicate that
271     the filter is not portable.  It could be extended to make other
272     transformations in the future.
273     """
274     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
275         raise ValueError("script_version filter is not limited to commit hashes")
276
277 def attr_filtered(filter_, *attr_names):
278     """Return True if filter_ applies to any of attr_names, else False."""
279     return any((name == 'any') or (name in attr_names)
280                for name in filter_iter(filter_[0]))
281
282 @contextlib.contextmanager
283 def exception_handler(handler, *exc_types):
284     """If any exc_types are raised in the block, call handler on the exception."""
285     try:
286         yield
287     except exc_types as error:
288         handler(error)
289
290
291 # copy_workflow(wf_uuid, src, dst, args)
292 #
293 #    Copies a workflow identified by wf_uuid from src to dst.
294 #
295 #    If args.recursive is True, also copy any collections
296 #      referenced in the workflow definition yaml.
297 #
298 #    The owner_uuid of the new workflow is set to any given
299 #      project_uuid or the user who copied the template.
300 #
301 #    Returns the copied workflow object.
302 #
303 def copy_workflow(wf_uuid, src, dst, args):
304     # fetch the workflow from the source instance
305     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
306
307     if not wf["definition"]:
308         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
309
310     # copy collections and docker images
311     if args.recursive and wf["definition"]:
312         wf_def = yaml.safe_load(wf["definition"])
313         if wf_def is not None:
314             locations = []
315             docker_images = {}
316             graph = wf_def.get('$graph', None)
317             if graph is not None:
318                 workflow_collections(graph, locations, docker_images)
319             else:
320                 workflow_collections(wf_def, locations, docker_images)
321
322             if locations:
323                 copy_collections(locations, src, dst, args)
324
325             for image in docker_images:
326                 copy_docker_image(image, docker_images[image], src, dst, args)
327
328     # copy the workflow itself
329     del wf['uuid']
330     wf['owner_uuid'] = args.project_uuid
331
332     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
333                                              ["name", "=", wf["name"]]]).execute()
334     if len(existing["items"]) == 0:
335         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
336     else:
337         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
338
339
340 def workflow_collections(obj, locations, docker_images):
341     if isinstance(obj, dict):
342         loc = obj.get('location', None)
343         if loc is not None:
344             if loc.startswith("keep:"):
345                 locations.append(loc[5:])
346
347         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
348         if docker_image is not None:
349             ds = docker_image.split(":", 1)
350             tag = ds[1] if len(ds)==2 else 'latest'
351             docker_images[ds[0]] = tag
352
353         for x in obj:
354             workflow_collections(obj[x], locations, docker_images)
355     elif isinstance(obj, list):
356         for x in obj:
357             workflow_collections(x, locations, docker_images)
358
359 # copy_collections(obj, src, dst, args)
360 #
361 #    Recursively copies all collections referenced by 'obj' from src
362 #    to dst.  obj may be a dict or a list, in which case we run
363 #    copy_collections on every value it contains. If it is a string,
364 #    search it for any substring that matches a collection hash or uuid
365 #    (this will find hidden references to collections like
366 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
367 #
368 #    Returns a copy of obj with any old collection uuids replaced by
369 #    the new ones.
370 #
371 def copy_collections(obj, src, dst, args):
372
373     def copy_collection_fn(collection_match):
374         """Helper function for regex substitution: copies a single collection,
375         identified by the collection_match MatchObject, to the
376         destination.  Returns the destination collection uuid (or the
377         portable data hash if that's what src_id is).
378
379         """
380         src_id = collection_match.group(0)
381         if src_id not in collections_copied:
382             dst_col = copy_collection(src_id, src, dst, args)
383             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
384                 collections_copied[src_id] = src_id
385             else:
386                 collections_copied[src_id] = dst_col['uuid']
387         return collections_copied[src_id]
388
389     if isinstance(obj, basestring):
390         # Copy any collections identified in this string to dst, replacing
391         # them with the dst uuids as necessary.
392         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
393         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
394         return obj
395     elif isinstance(obj, dict):
396         return type(obj)((v, copy_collections(obj[v], src, dst, args))
397                          for v in obj)
398     elif isinstance(obj, list):
399         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
400     return obj
401
402
403 def total_collection_size(manifest_text):
404     """Return the total number of bytes in this collection (excluding
405     duplicate blocks)."""
406
407     total_bytes = 0
408     locators_seen = {}
409     for line in manifest_text.splitlines():
410         words = line.split()
411         for word in words[1:]:
412             try:
413                 loc = arvados.KeepLocator(word)
414             except ValueError:
415                 continue  # this word isn't a locator, skip it
416             if loc.md5sum not in locators_seen:
417                 locators_seen[loc.md5sum] = True
418                 total_bytes += loc.size
419
420     return total_bytes
421
422 def create_collection_from(c, src, dst, args):
423     """Create a new collection record on dst, and copy Docker metadata if
424     available."""
425
426     collection_uuid = c['uuid']
427     body = {}
428     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
429         body[d] = c[d]
430
431     if not body["name"]:
432         body['name'] = "copied from " + collection_uuid
433
434     if args.storage_classes:
435         body['storage_classes_desired'] = args.storage_classes
436
437     body['owner_uuid'] = args.project_uuid
438
439     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
440
441     # Create docker_image_repo+tag and docker_image_hash links
442     # at the destination.
443     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
444         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
445
446         for src_link in docker_links:
447             body = {key: src_link[key]
448                     for key in ['link_class', 'name', 'properties']}
449             body['head_uuid'] = dst_collection['uuid']
450             body['owner_uuid'] = args.project_uuid
451
452             lk = dst.links().create(body=body).execute(num_retries=args.retries)
453             logger.debug('created dst link {}'.format(lk))
454
455     return dst_collection
456
457 # copy_collection(obj_uuid, src, dst, args)
458 #
459 #    Copies the collection identified by obj_uuid from src to dst.
460 #    Returns the collection object created at dst.
461 #
462 #    If args.progress is True, produce a human-friendly progress
463 #    report.
464 #
465 #    If a collection with the desired portable_data_hash already
466 #    exists at dst, and args.force is False, copy_collection returns
467 #    the existing collection without copying any blocks.  Otherwise
468 #    (if no collection exists or if args.force is True)
469 #    copy_collection copies all of the collection data blocks from src
470 #    to dst.
471 #
472 #    For this application, it is critical to preserve the
473 #    collection's manifest hash, which is not guaranteed with the
474 #    arvados.CollectionReader and arvados.CollectionWriter classes.
475 #    Copying each block in the collection manually, followed by
476 #    the manifest block, ensures that the collection's manifest
477 #    hash will not change.
478 #
479 def copy_collection(obj_uuid, src, dst, args):
480     if arvados.util.keep_locator_pattern.match(obj_uuid):
481         # If the obj_uuid is a portable data hash, it might not be
482         # uniquely identified with a particular collection.  As a
483         # result, it is ambiguous as to what name to use for the copy.
484         # Apply some heuristics to pick which collection to get the
485         # name from.
486         srccol = src.collections().list(
487             filters=[['portable_data_hash', '=', obj_uuid]],
488             order="created_at asc"
489             ).execute(num_retries=args.retries)
490
491         items = srccol.get("items")
492
493         if not items:
494             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
495             return
496
497         c = None
498
499         if len(items) == 1:
500             # There's only one collection with the PDH, so use that.
501             c = items[0]
502         if not c:
503             # See if there is a collection that's in the same project
504             # as the root item (usually a workflow) being copied.
505             for i in items:
506                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
507                     c = i
508                     break
509         if not c:
510             # Didn't find any collections located in the same project, so
511             # pick the oldest collection that has a name assigned to it.
512             for i in items:
513                 if i.get("name"):
514                     c = i
515                     break
516         if not c:
517             # None of the collections have names (?!), so just pick the
518             # first one.
519             c = items[0]
520
521         # list() doesn't return manifest text (and we don't want it to,
522         # because we don't need the same maninfest text sent to us 50
523         # times) so go and retrieve the collection object directly
524         # which will include the manifest text.
525         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
526     else:
527         # Assume this is an actual collection uuid, so fetch it directly.
528         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
529
530     # If a collection with this hash already exists at the
531     # destination, and 'force' is not true, just return that
532     # collection.
533     if not args.force:
534         if 'portable_data_hash' in c:
535             colhash = c['portable_data_hash']
536         else:
537             colhash = c['uuid']
538         dstcol = dst.collections().list(
539             filters=[['portable_data_hash', '=', colhash]]
540         ).execute(num_retries=args.retries)
541         if dstcol['items_available'] > 0:
542             for d in dstcol['items']:
543                 if ((args.project_uuid == d['owner_uuid']) and
544                     (c.get('name') == d['name']) and
545                     (c['portable_data_hash'] == d['portable_data_hash'])):
546                     return d
547             c['manifest_text'] = dst.collections().get(
548                 uuid=dstcol['items'][0]['uuid']
549             ).execute(num_retries=args.retries)['manifest_text']
550             return create_collection_from(c, src, dst, args)
551
552     # Fetch the collection's manifest.
553     manifest = c['manifest_text']
554     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
555
556     # Copy each block from src_keep to dst_keep.
557     # Use the newly signed locators returned from dst_keep to build
558     # a new manifest as we go.
559     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
560     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
561     dst_manifest = io.StringIO()
562     dst_locators = {}
563     bytes_written = 0
564     bytes_expected = total_collection_size(manifest)
565     if args.progress:
566         progress_writer = ProgressWriter(human_progress)
567     else:
568         progress_writer = None
569
570     for line in manifest.splitlines():
571         words = line.split()
572         dst_manifest.write(words[0])
573         for word in words[1:]:
574             try:
575                 loc = arvados.KeepLocator(word)
576             except ValueError:
577                 # If 'word' can't be parsed as a locator,
578                 # presume it's a filename.
579                 dst_manifest.write(' ')
580                 dst_manifest.write(word)
581                 continue
582             blockhash = loc.md5sum
583             # copy this block if we haven't seen it before
584             # (otherwise, just reuse the existing dst_locator)
585             if blockhash not in dst_locators:
586                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
587                 if progress_writer:
588                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
589                 data = src_keep.get(word)
590                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
591                 dst_locators[blockhash] = dst_locator
592                 bytes_written += loc.size
593             dst_manifest.write(' ')
594             dst_manifest.write(dst_locators[blockhash])
595         dst_manifest.write("\n")
596
597     if progress_writer:
598         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
599         progress_writer.finish()
600
601     # Copy the manifest and save the collection.
602     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
603
604     c['manifest_text'] = dst_manifest.getvalue()
605     return create_collection_from(c, src, dst, args)
606
607 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
608     r = api.repositories().list(
609         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
610     if r['items_available'] != 1:
611         raise Exception('cannot identify repo {}; {} repos found'
612                         .format(repo_name, r['items_available']))
613
614     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
615     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
616     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
617
618     priority = https_url + other_url + http_url
619
620     for url in priority:
621         if url.startswith("http"):
622             u = urllib.parse.urlsplit(url)
623             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
624             git_config = ["-c", "credential.%s/.username=none" % baseurl,
625                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
626         else:
627             git_config = []
628
629         try:
630             logger.debug("trying %s", url)
631             subprocess.run(
632                 ['git', *git_config, 'ls-remote', url],
633                 check=True,
634                 env={
635                     'ARVADOS_API_TOKEN': api.api_token,
636                     'GIT_ASKPASS': '/bin/false',
637                     'HOME': os.environ['HOME'],
638                 },
639                 stdout=subprocess.DEVNULL,
640             )
641         except subprocess.CalledProcessError:
642             pass
643         else:
644             git_url = url
645             break
646     else:
647         raise Exception('Cannot access git repository, tried {}'
648                         .format(priority))
649
650     if git_url.startswith("http:"):
651         if allow_insecure_http:
652             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
653         else:
654             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
655
656     return (git_url, git_config)
657
658
659 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
660     """Copy the docker image identified by docker_image and
661     docker_image_tag from src to dst. Create appropriate
662     docker_image_repo+tag and docker_image_hash links at dst.
663
664     """
665
666     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
667
668     # Find the link identifying this docker image.
669     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
670         src, args.retries, docker_image, docker_image_tag)
671     if docker_image_list:
672         image_uuid, image_info = docker_image_list[0]
673         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
674
675         # Copy the collection it refers to.
676         dst_image_col = copy_collection(image_uuid, src, dst, args)
677     elif arvados.util.keep_locator_pattern.match(docker_image):
678         dst_image_col = copy_collection(docker_image, src, dst, args)
679     else:
680         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
681
682 def copy_project(obj_uuid, src, dst, owner_uuid, args):
683
684     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
685
686     # Create/update the destination project
687     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
688                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
689     if len(existing["items"]) == 0:
690         project_record = dst.groups().create(body={"group": {"group_class": "project",
691                                                              "owner_uuid": owner_uuid,
692                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
693     else:
694         project_record = existing["items"][0]
695
696     dst.groups().update(uuid=project_record["uuid"],
697                         body={"group": {
698                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
699
700     args.project_uuid = project_record["uuid"]
701
702     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
703
704
705     partial_error = ""
706
707     # Copy collections
708     try:
709         copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
710                          src, dst, args)
711     except Exception as e:
712         partial_error += "\n" + str(e)
713
714     # Copy workflows
715     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
716         try:
717             copy_workflow(w["uuid"], src, dst, args)
718         except Exception as e:
719             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
720
721     if args.recursive:
722         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
723             try:
724                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
725             except Exception as e:
726                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
727
728     project_record["partial_error"] = partial_error
729
730     return project_record
731
732 # git_rev_parse(rev, repo)
733 #
734 #    Returns the 40-character commit hash corresponding to 'rev' in
735 #    git repository 'repo' (which must be the path of a local git
736 #    repository)
737 #
738 def git_rev_parse(rev, repo):
739     proc = subprocess.run(
740         ['git', 'rev-parse', rev],
741         check=True,
742         cwd=repo,
743         stdout=subprocess.PIPE,
744         text=True,
745     )
746     return proc.stdout.read().strip()
747
748 # uuid_type(api, object_uuid)
749 #
750 #    Returns the name of the class that object_uuid belongs to, based on
751 #    the second field of the uuid.  This function consults the api's
752 #    schema to identify the object class.
753 #
754 #    It returns a string such as 'Collection', 'Workflow', etc.
755 #
756 #    Special case: if handed a Keep locator hash, return 'Collection'.
757 #
758 def uuid_type(api, object_uuid):
759     if re.match(arvados.util.keep_locator_pattern, object_uuid):
760         return 'Collection'
761     p = object_uuid.split('-')
762     if len(p) == 3:
763         type_prefix = p[1]
764         for k in api._schema.schemas:
765             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
766             if type_prefix == obj_class:
767                 return k
768     return None
769
770 def abort(msg, code=1):
771     logger.info("arv-copy: %s", msg)
772     exit(code)
773
774
775 # Code for reporting on the progress of a collection upload.
776 # Stolen from arvados.commands.put.ArvPutCollectionWriter
777 # TODO(twp): figure out how to refactor into a shared library
778 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
779 # code)
780
781 def machine_progress(obj_uuid, bytes_written, bytes_expected):
782     return "{} {}: {} {} written {} total\n".format(
783         sys.argv[0],
784         os.getpid(),
785         obj_uuid,
786         bytes_written,
787         -1 if (bytes_expected is None) else bytes_expected)
788
789 def human_progress(obj_uuid, bytes_written, bytes_expected):
790     if bytes_expected:
791         return "\r{}: {}M / {}M {:.1%} ".format(
792             obj_uuid,
793             bytes_written >> 20, bytes_expected >> 20,
794             float(bytes_written) / bytes_expected)
795     else:
796         return "\r{}: {} ".format(obj_uuid, bytes_written)
797
798 class ProgressWriter(object):
799     _progress_func = None
800     outfile = sys.stderr
801
802     def __init__(self, progress_func):
803         self._progress_func = progress_func
804
805     def report(self, obj_uuid, bytes_written, bytes_expected):
806         if self._progress_func is not None:
807             self.outfile.write(
808                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
809
810     def finish(self):
811         self.outfile.write("\n")
812
813 if __name__ == '__main__':
814     main()