Merge branch 'main' from workbench2.git
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import subprocess
34 import sys
35 import logging
36 import tempfile
37 import urllib.parse
38 import io
39
40 import arvados
41 import arvados.config
42 import arvados.keep
43 import arvados.util
44 import arvados.commands._util as arv_cmd
45 import arvados.commands.keepdocker
46 import ruamel.yaml as yaml
47
48 from arvados._version import __version__
49
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51
52 logger = logging.getLogger('arvados.arv-copy')
53
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
56 # directories.
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 #
60 local_repo_dir = {}
61
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
65
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
68
69 # The owner_uuid of the object being copied
70 src_owner_uuid = None
71
72 def main():
73     copy_opts = argparse.ArgumentParser(add_help=False)
74
75     copy_opts.add_argument(
76         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77         help='Print version and exit.')
78     copy_opts.add_argument(
79         '-v', '--verbose', dest='verbose', action='store_true',
80         help='Verbose output.')
81     copy_opts.add_argument(
82         '--progress', dest='progress', action='store_true',
83         help='Report progress on copying collections. (default)')
84     copy_opts.add_argument(
85         '--no-progress', dest='progress', action='store_false',
86         help='Do not report progress on copying collections.')
87     copy_opts.add_argument(
88         '-f', '--force', dest='force', action='store_true',
89         help='Perform copy even if the object appears to exist at the remote destination.')
90     copy_opts.add_argument(
91         '--src', dest='source_arvados',
92         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados',
95         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object, and subprojects. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies or subprojects.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105     copy_opts.add_argument(
106         '--storage-classes', dest='storage_classes',
107         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.storage_classes:
121         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
122
123     if args.verbose:
124         logger.setLevel(logging.DEBUG)
125     else:
126         logger.setLevel(logging.INFO)
127
128     if not args.source_arvados:
129         args.source_arvados = args.object_uuid[:5]
130
131     # Create API clients for the source and destination instances
132     src_arv = api_for_instance(args.source_arvados, args.retries)
133     dst_arv = api_for_instance(args.destination_arvados, args.retries)
134
135     if not args.project_uuid:
136         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137
138     # Identify the kind of object we have been given, and begin copying.
139     t = uuid_type(src_arv, args.object_uuid)
140     if t == 'Collection':
141         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142         result = copy_collection(args.object_uuid,
143                                  src_arv, dst_arv,
144                                  args)
145     elif t == 'Workflow':
146         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
147         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
148     elif t == 'Group':
149         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
150         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in listvalues(local_repo_dir):
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     print(result['uuid'])
165
166     if result.get('partial_error'):
167         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
168         exit(1)
169
170     logger.info("Success: created copy with uuid {}".format(result['uuid']))
171     exit(0)
172
173 def set_src_owner_uuid(resource, uuid, args):
174     global src_owner_uuid
175     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
176     src_owner_uuid = c.get("owner_uuid")
177
178 # api_for_instance(instance_name)
179 #
180 #     Creates an API client for the Arvados instance identified by
181 #     instance_name.
182 #
183 #     If instance_name contains a slash, it is presumed to be a path
184 #     (either local or absolute) to a file with Arvados configuration
185 #     settings.
186 #
187 #     Otherwise, it is presumed to be the name of a file in
188 #     $HOME/.config/arvados/instance_name.conf
189 #
190 def api_for_instance(instance_name, num_retries):
191     if not instance_name:
192         # Use environment
193         return arvados.api('v1')
194
195     if '/' in instance_name:
196         config_file = instance_name
197     else:
198         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
199
200     try:
201         cfg = arvados.config.load(config_file)
202     except (IOError, OSError) as e:
203         abort(("Could not open config file {}: {}\n" +
204                "You must make sure that your configuration tokens\n" +
205                "for Arvados instance {} are in {} and that this\n" +
206                "file is readable.").format(
207                    config_file, e, instance_name, config_file))
208
209     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
210         api_is_insecure = (
211             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
212                 ['1', 't', 'true', 'y', 'yes']))
213         client = arvados.api('v1',
214                              host=cfg['ARVADOS_API_HOST'],
215                              token=cfg['ARVADOS_API_TOKEN'],
216                              insecure=api_is_insecure,
217                              num_retries=num_retries,
218                              )
219     else:
220         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
221     return client
222
223 # Check if git is available
224 def check_git_availability():
225     try:
226         subprocess.run(
227             ['git', '--version'],
228             check=True,
229             stdout=subprocess.DEVNULL,
230         )
231     except FileNotFoundError:
232         abort('git command is not available. Please ensure git is installed.')
233
234
235 def filter_iter(arg):
236     """Iterate a filter string-or-list.
237
238     Pass in a filter field that can either be a string or list.
239     This will iterate elements as if the field had been written as a list.
240     """
241     if isinstance(arg, basestring):
242         return iter((arg,))
243     else:
244         return iter(arg)
245
246 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
247     """Update a single repository filter in-place for the destination.
248
249     If the filter checks that the repository is src_repository, it is
250     updated to check that the repository is dst_repository.  If it does
251     anything else, this function raises ValueError.
252     """
253     if src_repository is None:
254         raise ValueError("component does not specify a source repository")
255     elif dst_repository is None:
256         raise ValueError("no destination repository specified to update repository filter")
257     elif repo_filter[1:] == ['=', src_repository]:
258         repo_filter[2] = dst_repository
259     elif repo_filter[1:] == ['in', [src_repository]]:
260         repo_filter[2] = [dst_repository]
261     else:
262         raise ValueError("repository filter is not a simple source match")
263
264 def migrate_script_version_filter(version_filter):
265     """Update a single script_version filter in-place for the destination.
266
267     Currently this function checks that all the filter operands are Git
268     commit hashes.  If they're not, it raises ValueError to indicate that
269     the filter is not portable.  It could be extended to make other
270     transformations in the future.
271     """
272     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
273         raise ValueError("script_version filter is not limited to commit hashes")
274
275 def attr_filtered(filter_, *attr_names):
276     """Return True if filter_ applies to any of attr_names, else False."""
277     return any((name == 'any') or (name in attr_names)
278                for name in filter_iter(filter_[0]))
279
280 @contextlib.contextmanager
281 def exception_handler(handler, *exc_types):
282     """If any exc_types are raised in the block, call handler on the exception."""
283     try:
284         yield
285     except exc_types as error:
286         handler(error)
287
288
289 # copy_workflow(wf_uuid, src, dst, args)
290 #
291 #    Copies a workflow identified by wf_uuid from src to dst.
292 #
293 #    If args.recursive is True, also copy any collections
294 #      referenced in the workflow definition yaml.
295 #
296 #    The owner_uuid of the new workflow is set to any given
297 #      project_uuid or the user who copied the template.
298 #
299 #    Returns the copied workflow object.
300 #
301 def copy_workflow(wf_uuid, src, dst, args):
302     # fetch the workflow from the source instance
303     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
304
305     if not wf["definition"]:
306         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
307
308     # copy collections and docker images
309     if args.recursive and wf["definition"]:
310         wf_def = yaml.safe_load(wf["definition"])
311         if wf_def is not None:
312             locations = []
313             docker_images = {}
314             graph = wf_def.get('$graph', None)
315             if graph is not None:
316                 workflow_collections(graph, locations, docker_images)
317             else:
318                 workflow_collections(wf_def, locations, docker_images)
319
320             if locations:
321                 copy_collections(locations, src, dst, args)
322
323             for image in docker_images:
324                 copy_docker_image(image, docker_images[image], src, dst, args)
325
326     # copy the workflow itself
327     del wf['uuid']
328     wf['owner_uuid'] = args.project_uuid
329
330     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
331                                              ["name", "=", wf["name"]]]).execute()
332     if len(existing["items"]) == 0:
333         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
334     else:
335         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
336
337
338 def workflow_collections(obj, locations, docker_images):
339     if isinstance(obj, dict):
340         loc = obj.get('location', None)
341         if loc is not None:
342             if loc.startswith("keep:"):
343                 locations.append(loc[5:])
344
345         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
346         if docker_image is not None:
347             ds = docker_image.split(":", 1)
348             tag = ds[1] if len(ds)==2 else 'latest'
349             docker_images[ds[0]] = tag
350
351         for x in obj:
352             workflow_collections(obj[x], locations, docker_images)
353     elif isinstance(obj, list):
354         for x in obj:
355             workflow_collections(x, locations, docker_images)
356
357 # copy_collections(obj, src, dst, args)
358 #
359 #    Recursively copies all collections referenced by 'obj' from src
360 #    to dst.  obj may be a dict or a list, in which case we run
361 #    copy_collections on every value it contains. If it is a string,
362 #    search it for any substring that matches a collection hash or uuid
363 #    (this will find hidden references to collections like
364 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
365 #
366 #    Returns a copy of obj with any old collection uuids replaced by
367 #    the new ones.
368 #
369 def copy_collections(obj, src, dst, args):
370
371     def copy_collection_fn(collection_match):
372         """Helper function for regex substitution: copies a single collection,
373         identified by the collection_match MatchObject, to the
374         destination.  Returns the destination collection uuid (or the
375         portable data hash if that's what src_id is).
376
377         """
378         src_id = collection_match.group(0)
379         if src_id not in collections_copied:
380             dst_col = copy_collection(src_id, src, dst, args)
381             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
382                 collections_copied[src_id] = src_id
383             else:
384                 collections_copied[src_id] = dst_col['uuid']
385         return collections_copied[src_id]
386
387     if isinstance(obj, basestring):
388         # Copy any collections identified in this string to dst, replacing
389         # them with the dst uuids as necessary.
390         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
391         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
392         return obj
393     elif isinstance(obj, dict):
394         return type(obj)((v, copy_collections(obj[v], src, dst, args))
395                          for v in obj)
396     elif isinstance(obj, list):
397         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
398     return obj
399
400
401 def total_collection_size(manifest_text):
402     """Return the total number of bytes in this collection (excluding
403     duplicate blocks)."""
404
405     total_bytes = 0
406     locators_seen = {}
407     for line in manifest_text.splitlines():
408         words = line.split()
409         for word in words[1:]:
410             try:
411                 loc = arvados.KeepLocator(word)
412             except ValueError:
413                 continue  # this word isn't a locator, skip it
414             if loc.md5sum not in locators_seen:
415                 locators_seen[loc.md5sum] = True
416                 total_bytes += loc.size
417
418     return total_bytes
419
420 def create_collection_from(c, src, dst, args):
421     """Create a new collection record on dst, and copy Docker metadata if
422     available."""
423
424     collection_uuid = c['uuid']
425     body = {}
426     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
427         body[d] = c[d]
428
429     if not body["name"]:
430         body['name'] = "copied from " + collection_uuid
431
432     if args.storage_classes:
433         body['storage_classes_desired'] = args.storage_classes
434
435     body['owner_uuid'] = args.project_uuid
436
437     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
438
439     # Create docker_image_repo+tag and docker_image_hash links
440     # at the destination.
441     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
442         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
443
444         for src_link in docker_links:
445             body = {key: src_link[key]
446                     for key in ['link_class', 'name', 'properties']}
447             body['head_uuid'] = dst_collection['uuid']
448             body['owner_uuid'] = args.project_uuid
449
450             lk = dst.links().create(body=body).execute(num_retries=args.retries)
451             logger.debug('created dst link {}'.format(lk))
452
453     return dst_collection
454
455 # copy_collection(obj_uuid, src, dst, args)
456 #
457 #    Copies the collection identified by obj_uuid from src to dst.
458 #    Returns the collection object created at dst.
459 #
460 #    If args.progress is True, produce a human-friendly progress
461 #    report.
462 #
463 #    If a collection with the desired portable_data_hash already
464 #    exists at dst, and args.force is False, copy_collection returns
465 #    the existing collection without copying any blocks.  Otherwise
466 #    (if no collection exists or if args.force is True)
467 #    copy_collection copies all of the collection data blocks from src
468 #    to dst.
469 #
470 #    For this application, it is critical to preserve the
471 #    collection's manifest hash, which is not guaranteed with the
472 #    arvados.CollectionReader and arvados.CollectionWriter classes.
473 #    Copying each block in the collection manually, followed by
474 #    the manifest block, ensures that the collection's manifest
475 #    hash will not change.
476 #
477 def copy_collection(obj_uuid, src, dst, args):
478     if arvados.util.keep_locator_pattern.match(obj_uuid):
479         # If the obj_uuid is a portable data hash, it might not be
480         # uniquely identified with a particular collection.  As a
481         # result, it is ambiguous as to what name to use for the copy.
482         # Apply some heuristics to pick which collection to get the
483         # name from.
484         srccol = src.collections().list(
485             filters=[['portable_data_hash', '=', obj_uuid]],
486             order="created_at asc"
487             ).execute(num_retries=args.retries)
488
489         items = srccol.get("items")
490
491         if not items:
492             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
493             return
494
495         c = None
496
497         if len(items) == 1:
498             # There's only one collection with the PDH, so use that.
499             c = items[0]
500         if not c:
501             # See if there is a collection that's in the same project
502             # as the root item (usually a workflow) being copied.
503             for i in items:
504                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
505                     c = i
506                     break
507         if not c:
508             # Didn't find any collections located in the same project, so
509             # pick the oldest collection that has a name assigned to it.
510             for i in items:
511                 if i.get("name"):
512                     c = i
513                     break
514         if not c:
515             # None of the collections have names (?!), so just pick the
516             # first one.
517             c = items[0]
518
519         # list() doesn't return manifest text (and we don't want it to,
520         # because we don't need the same maninfest text sent to us 50
521         # times) so go and retrieve the collection object directly
522         # which will include the manifest text.
523         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
524     else:
525         # Assume this is an actual collection uuid, so fetch it directly.
526         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
527
528     # If a collection with this hash already exists at the
529     # destination, and 'force' is not true, just return that
530     # collection.
531     if not args.force:
532         if 'portable_data_hash' in c:
533             colhash = c['portable_data_hash']
534         else:
535             colhash = c['uuid']
536         dstcol = dst.collections().list(
537             filters=[['portable_data_hash', '=', colhash]]
538         ).execute(num_retries=args.retries)
539         if dstcol['items_available'] > 0:
540             for d in dstcol['items']:
541                 if ((args.project_uuid == d['owner_uuid']) and
542                     (c.get('name') == d['name']) and
543                     (c['portable_data_hash'] == d['portable_data_hash'])):
544                     return d
545             c['manifest_text'] = dst.collections().get(
546                 uuid=dstcol['items'][0]['uuid']
547             ).execute(num_retries=args.retries)['manifest_text']
548             return create_collection_from(c, src, dst, args)
549
550     # Fetch the collection's manifest.
551     manifest = c['manifest_text']
552     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
553
554     # Copy each block from src_keep to dst_keep.
555     # Use the newly signed locators returned from dst_keep to build
556     # a new manifest as we go.
557     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
558     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
559     dst_manifest = io.StringIO()
560     dst_locators = {}
561     bytes_written = 0
562     bytes_expected = total_collection_size(manifest)
563     if args.progress:
564         progress_writer = ProgressWriter(human_progress)
565     else:
566         progress_writer = None
567
568     for line in manifest.splitlines():
569         words = line.split()
570         dst_manifest.write(words[0])
571         for word in words[1:]:
572             try:
573                 loc = arvados.KeepLocator(word)
574             except ValueError:
575                 # If 'word' can't be parsed as a locator,
576                 # presume it's a filename.
577                 dst_manifest.write(' ')
578                 dst_manifest.write(word)
579                 continue
580             blockhash = loc.md5sum
581             # copy this block if we haven't seen it before
582             # (otherwise, just reuse the existing dst_locator)
583             if blockhash not in dst_locators:
584                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
585                 if progress_writer:
586                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
587                 data = src_keep.get(word)
588                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
589                 dst_locators[blockhash] = dst_locator
590                 bytes_written += loc.size
591             dst_manifest.write(' ')
592             dst_manifest.write(dst_locators[blockhash])
593         dst_manifest.write("\n")
594
595     if progress_writer:
596         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
597         progress_writer.finish()
598
599     # Copy the manifest and save the collection.
600     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
601
602     c['manifest_text'] = dst_manifest.getvalue()
603     return create_collection_from(c, src, dst, args)
604
605 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
606     r = api.repositories().list(
607         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
608     if r['items_available'] != 1:
609         raise Exception('cannot identify repo {}; {} repos found'
610                         .format(repo_name, r['items_available']))
611
612     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
613     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
614     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
615
616     priority = https_url + other_url + http_url
617
618     for url in priority:
619         if url.startswith("http"):
620             u = urllib.parse.urlsplit(url)
621             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
622             git_config = ["-c", "credential.%s/.username=none" % baseurl,
623                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
624         else:
625             git_config = []
626
627         try:
628             logger.debug("trying %s", url)
629             subprocess.run(
630                 ['git', *git_config, 'ls-remote', url],
631                 check=True,
632                 env={
633                     'ARVADOS_API_TOKEN': api.api_token,
634                     'GIT_ASKPASS': '/bin/false',
635                     'HOME': os.environ['HOME'],
636                 },
637                 stdout=subprocess.DEVNULL,
638             )
639         except subprocess.CalledProcessError:
640             pass
641         else:
642             git_url = url
643             break
644     else:
645         raise Exception('Cannot access git repository, tried {}'
646                         .format(priority))
647
648     if git_url.startswith("http:"):
649         if allow_insecure_http:
650             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
651         else:
652             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
653
654     return (git_url, git_config)
655
656
657 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
658     """Copy the docker image identified by docker_image and
659     docker_image_tag from src to dst. Create appropriate
660     docker_image_repo+tag and docker_image_hash links at dst.
661
662     """
663
664     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
665
666     # Find the link identifying this docker image.
667     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
668         src, args.retries, docker_image, docker_image_tag)
669     if docker_image_list:
670         image_uuid, image_info = docker_image_list[0]
671         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
672
673         # Copy the collection it refers to.
674         dst_image_col = copy_collection(image_uuid, src, dst, args)
675     elif arvados.util.keep_locator_pattern.match(docker_image):
676         dst_image_col = copy_collection(docker_image, src, dst, args)
677     else:
678         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
679
680 def copy_project(obj_uuid, src, dst, owner_uuid, args):
681
682     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
683
684     # Create/update the destination project
685     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
686                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
687     if len(existing["items"]) == 0:
688         project_record = dst.groups().create(body={"group": {"group_class": "project",
689                                                              "owner_uuid": owner_uuid,
690                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
691     else:
692         project_record = existing["items"][0]
693
694     dst.groups().update(uuid=project_record["uuid"],
695                         body={"group": {
696                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
697
698     args.project_uuid = project_record["uuid"]
699
700     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
701
702
703     partial_error = ""
704
705     # Copy collections
706     try:
707         copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
708                          src, dst, args)
709     except Exception as e:
710         partial_error += "\n" + str(e)
711
712     # Copy workflows
713     for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
714         try:
715             copy_workflow(w["uuid"], src, dst, args)
716         except Exception as e:
717             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
718
719     if args.recursive:
720         for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
721             try:
722                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
723             except Exception as e:
724                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
725
726     project_record["partial_error"] = partial_error
727
728     return project_record
729
730 # git_rev_parse(rev, repo)
731 #
732 #    Returns the 40-character commit hash corresponding to 'rev' in
733 #    git repository 'repo' (which must be the path of a local git
734 #    repository)
735 #
736 def git_rev_parse(rev, repo):
737     proc = subprocess.run(
738         ['git', 'rev-parse', rev],
739         check=True,
740         cwd=repo,
741         stdout=subprocess.PIPE,
742         text=True,
743     )
744     return proc.stdout.read().strip()
745
746 # uuid_type(api, object_uuid)
747 #
748 #    Returns the name of the class that object_uuid belongs to, based on
749 #    the second field of the uuid.  This function consults the api's
750 #    schema to identify the object class.
751 #
752 #    It returns a string such as 'Collection', 'Workflow', etc.
753 #
754 #    Special case: if handed a Keep locator hash, return 'Collection'.
755 #
756 def uuid_type(api, object_uuid):
757     if re.match(arvados.util.keep_locator_pattern, object_uuid):
758         return 'Collection'
759     p = object_uuid.split('-')
760     if len(p) == 3:
761         type_prefix = p[1]
762         for k in api._schema.schemas:
763             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
764             if type_prefix == obj_class:
765                 return k
766     return None
767
768 def abort(msg, code=1):
769     logger.info("arv-copy: %s", msg)
770     exit(code)
771
772
773 # Code for reporting on the progress of a collection upload.
774 # Stolen from arvados.commands.put.ArvPutCollectionWriter
775 # TODO(twp): figure out how to refactor into a shared library
776 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
777 # code)
778
779 def machine_progress(obj_uuid, bytes_written, bytes_expected):
780     return "{} {}: {} {} written {} total\n".format(
781         sys.argv[0],
782         os.getpid(),
783         obj_uuid,
784         bytes_written,
785         -1 if (bytes_expected is None) else bytes_expected)
786
787 def human_progress(obj_uuid, bytes_written, bytes_expected):
788     if bytes_expected:
789         return "\r{}: {}M / {}M {:.1%} ".format(
790             obj_uuid,
791             bytes_written >> 20, bytes_expected >> 20,
792             float(bytes_written) / bytes_expected)
793     else:
794         return "\r{}: {} ".format(obj_uuid, bytes_written)
795
796 class ProgressWriter(object):
797     _progress_func = None
798     outfile = sys.stderr
799
800     def __init__(self, progress_func):
801         self._progress_func = progress_func
802
803     def report(self, obj_uuid, bytes_written, bytes_expected):
804         if self._progress_func is not None:
805             self.outfile.write(
806                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
807
808     def finish(self):
809         self.outfile.write("\n")
810
811 if __name__ == '__main__':
812     main()