Merge branch '19070-update-workflow-deps' refs #19070
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37 import io
38
39 import arvados
40 import arvados.config
41 import arvados.keep
42 import arvados.util
43 import arvados.commands._util as arv_cmd
44 import arvados.commands.keepdocker
45 import ruamel.yaml as yaml
46
47 from arvados.api import OrderedJsonModel
48 from arvados._version import __version__
49
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51
52 logger = logging.getLogger('arvados.arv-copy')
53
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
56 # directories.
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 #
60 local_repo_dir = {}
61
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
65
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
68
69 # The owner_uuid of the object being copied
70 src_owner_uuid = None
71
72 def main():
73     copy_opts = argparse.ArgumentParser(add_help=False)
74
75     copy_opts.add_argument(
76         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77         help='Print version and exit.')
78     copy_opts.add_argument(
79         '-v', '--verbose', dest='verbose', action='store_true',
80         help='Verbose output.')
81     copy_opts.add_argument(
82         '--progress', dest='progress', action='store_true',
83         help='Report progress on copying collections. (default)')
84     copy_opts.add_argument(
85         '--no-progress', dest='progress', action='store_false',
86         help='Do not report progress on copying collections.')
87     copy_opts.add_argument(
88         '-f', '--force', dest='force', action='store_true',
89         help='Perform copy even if the object appears to exist at the remote destination.')
90     copy_opts.add_argument(
91         '--src', dest='source_arvados',
92         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados',
95         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object, and subprojects. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies or subprojects.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105     copy_opts.add_argument(
106         '--storage-classes', dest='storage_classes',
107         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.storage_classes:
121         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
122
123     if args.verbose:
124         logger.setLevel(logging.DEBUG)
125     else:
126         logger.setLevel(logging.INFO)
127
128     if not args.source_arvados:
129         args.source_arvados = args.object_uuid[:5]
130
131     # Create API clients for the source and destination instances
132     src_arv = api_for_instance(args.source_arvados)
133     dst_arv = api_for_instance(args.destination_arvados)
134
135     if not args.project_uuid:
136         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137
138     # Identify the kind of object we have been given, and begin copying.
139     t = uuid_type(src_arv, args.object_uuid)
140     if t == 'Collection':
141         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142         result = copy_collection(args.object_uuid,
143                                  src_arv, dst_arv,
144                                  args)
145     elif t == 'Workflow':
146         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
147         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
148     elif t == 'Group':
149         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
150         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in listvalues(local_repo_dir):
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     print(result['uuid'])
165
166     if result.get('partial_error'):
167         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
168         exit(1)
169
170     logger.info("Success: created copy with uuid {}".format(result['uuid']))
171     exit(0)
172
173 def set_src_owner_uuid(resource, uuid, args):
174     global src_owner_uuid
175     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
176     src_owner_uuid = c.get("owner_uuid")
177
178 # api_for_instance(instance_name)
179 #
180 #     Creates an API client for the Arvados instance identified by
181 #     instance_name.
182 #
183 #     If instance_name contains a slash, it is presumed to be a path
184 #     (either local or absolute) to a file with Arvados configuration
185 #     settings.
186 #
187 #     Otherwise, it is presumed to be the name of a file in
188 #     $HOME/.config/arvados/instance_name.conf
189 #
190 def api_for_instance(instance_name):
191     if not instance_name:
192         # Use environment
193         return arvados.api('v1', model=OrderedJsonModel())
194
195     if '/' in instance_name:
196         config_file = instance_name
197     else:
198         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
199
200     try:
201         cfg = arvados.config.load(config_file)
202     except (IOError, OSError) as e:
203         abort(("Could not open config file {}: {}\n" +
204                "You must make sure that your configuration tokens\n" +
205                "for Arvados instance {} are in {} and that this\n" +
206                "file is readable.").format(
207                    config_file, e, instance_name, config_file))
208
209     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
210         api_is_insecure = (
211             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
212                 ['1', 't', 'true', 'y', 'yes']))
213         client = arvados.api('v1',
214                              host=cfg['ARVADOS_API_HOST'],
215                              token=cfg['ARVADOS_API_TOKEN'],
216                              insecure=api_is_insecure,
217                              model=OrderedJsonModel())
218     else:
219         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
220     return client
221
222 # Check if git is available
223 def check_git_availability():
224     try:
225         arvados.util.run_command(['git', '--help'])
226     except Exception:
227         abort('git command is not available. Please ensure git is installed.')
228
229
230 def filter_iter(arg):
231     """Iterate a filter string-or-list.
232
233     Pass in a filter field that can either be a string or list.
234     This will iterate elements as if the field had been written as a list.
235     """
236     if isinstance(arg, basestring):
237         return iter((arg,))
238     else:
239         return iter(arg)
240
241 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
242     """Update a single repository filter in-place for the destination.
243
244     If the filter checks that the repository is src_repository, it is
245     updated to check that the repository is dst_repository.  If it does
246     anything else, this function raises ValueError.
247     """
248     if src_repository is None:
249         raise ValueError("component does not specify a source repository")
250     elif dst_repository is None:
251         raise ValueError("no destination repository specified to update repository filter")
252     elif repo_filter[1:] == ['=', src_repository]:
253         repo_filter[2] = dst_repository
254     elif repo_filter[1:] == ['in', [src_repository]]:
255         repo_filter[2] = [dst_repository]
256     else:
257         raise ValueError("repository filter is not a simple source match")
258
259 def migrate_script_version_filter(version_filter):
260     """Update a single script_version filter in-place for the destination.
261
262     Currently this function checks that all the filter operands are Git
263     commit hashes.  If they're not, it raises ValueError to indicate that
264     the filter is not portable.  It could be extended to make other
265     transformations in the future.
266     """
267     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
268         raise ValueError("script_version filter is not limited to commit hashes")
269
270 def attr_filtered(filter_, *attr_names):
271     """Return True if filter_ applies to any of attr_names, else False."""
272     return any((name == 'any') or (name in attr_names)
273                for name in filter_iter(filter_[0]))
274
275 @contextlib.contextmanager
276 def exception_handler(handler, *exc_types):
277     """If any exc_types are raised in the block, call handler on the exception."""
278     try:
279         yield
280     except exc_types as error:
281         handler(error)
282
283
284 # copy_workflow(wf_uuid, src, dst, args)
285 #
286 #    Copies a workflow identified by wf_uuid from src to dst.
287 #
288 #    If args.recursive is True, also copy any collections
289 #      referenced in the workflow definition yaml.
290 #
291 #    The owner_uuid of the new workflow is set to any given
292 #      project_uuid or the user who copied the template.
293 #
294 #    Returns the copied workflow object.
295 #
296 def copy_workflow(wf_uuid, src, dst, args):
297     # fetch the workflow from the source instance
298     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
299
300     if not wf["definition"]:
301         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
302
303     # copy collections and docker images
304     if args.recursive and wf["definition"]:
305         wf_def = yaml.safe_load(wf["definition"])
306         if wf_def is not None:
307             locations = []
308             docker_images = {}
309             graph = wf_def.get('$graph', None)
310             if graph is not None:
311                 workflow_collections(graph, locations, docker_images)
312             else:
313                 workflow_collections(wf_def, locations, docker_images)
314
315             if locations:
316                 copy_collections(locations, src, dst, args)
317
318             for image in docker_images:
319                 copy_docker_image(image, docker_images[image], src, dst, args)
320
321     # copy the workflow itself
322     del wf['uuid']
323     wf['owner_uuid'] = args.project_uuid
324
325     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
326                                              ["name", "=", wf["name"]]]).execute()
327     if len(existing["items"]) == 0:
328         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
329     else:
330         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
331
332
333 def workflow_collections(obj, locations, docker_images):
334     if isinstance(obj, dict):
335         loc = obj.get('location', None)
336         if loc is not None:
337             if loc.startswith("keep:"):
338                 locations.append(loc[5:])
339
340         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
341         if docker_image is not None:
342             ds = docker_image.split(":", 1)
343             tag = ds[1] if len(ds)==2 else 'latest'
344             docker_images[ds[0]] = tag
345
346         for x in obj:
347             workflow_collections(obj[x], locations, docker_images)
348     elif isinstance(obj, list):
349         for x in obj:
350             workflow_collections(x, locations, docker_images)
351
352 # copy_collections(obj, src, dst, args)
353 #
354 #    Recursively copies all collections referenced by 'obj' from src
355 #    to dst.  obj may be a dict or a list, in which case we run
356 #    copy_collections on every value it contains. If it is a string,
357 #    search it for any substring that matches a collection hash or uuid
358 #    (this will find hidden references to collections like
359 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
360 #
361 #    Returns a copy of obj with any old collection uuids replaced by
362 #    the new ones.
363 #
364 def copy_collections(obj, src, dst, args):
365
366     def copy_collection_fn(collection_match):
367         """Helper function for regex substitution: copies a single collection,
368         identified by the collection_match MatchObject, to the
369         destination.  Returns the destination collection uuid (or the
370         portable data hash if that's what src_id is).
371
372         """
373         src_id = collection_match.group(0)
374         if src_id not in collections_copied:
375             dst_col = copy_collection(src_id, src, dst, args)
376             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
377                 collections_copied[src_id] = src_id
378             else:
379                 collections_copied[src_id] = dst_col['uuid']
380         return collections_copied[src_id]
381
382     if isinstance(obj, basestring):
383         # Copy any collections identified in this string to dst, replacing
384         # them with the dst uuids as necessary.
385         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
386         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
387         return obj
388     elif isinstance(obj, dict):
389         return type(obj)((v, copy_collections(obj[v], src, dst, args))
390                          for v in obj)
391     elif isinstance(obj, list):
392         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
393     return obj
394
395
396 def total_collection_size(manifest_text):
397     """Return the total number of bytes in this collection (excluding
398     duplicate blocks)."""
399
400     total_bytes = 0
401     locators_seen = {}
402     for line in manifest_text.splitlines():
403         words = line.split()
404         for word in words[1:]:
405             try:
406                 loc = arvados.KeepLocator(word)
407             except ValueError:
408                 continue  # this word isn't a locator, skip it
409             if loc.md5sum not in locators_seen:
410                 locators_seen[loc.md5sum] = True
411                 total_bytes += loc.size
412
413     return total_bytes
414
415 def create_collection_from(c, src, dst, args):
416     """Create a new collection record on dst, and copy Docker metadata if
417     available."""
418
419     collection_uuid = c['uuid']
420     body = {}
421     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
422         body[d] = c[d]
423
424     if not body["name"]:
425         body['name'] = "copied from " + collection_uuid
426
427     if args.storage_classes:
428         body['storage_classes_desired'] = args.storage_classes
429
430     body['owner_uuid'] = args.project_uuid
431
432     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
433
434     # Create docker_image_repo+tag and docker_image_hash links
435     # at the destination.
436     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
437         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
438
439         for src_link in docker_links:
440             body = {key: src_link[key]
441                     for key in ['link_class', 'name', 'properties']}
442             body['head_uuid'] = dst_collection['uuid']
443             body['owner_uuid'] = args.project_uuid
444
445             lk = dst.links().create(body=body).execute(num_retries=args.retries)
446             logger.debug('created dst link {}'.format(lk))
447
448     return dst_collection
449
450 # copy_collection(obj_uuid, src, dst, args)
451 #
452 #    Copies the collection identified by obj_uuid from src to dst.
453 #    Returns the collection object created at dst.
454 #
455 #    If args.progress is True, produce a human-friendly progress
456 #    report.
457 #
458 #    If a collection with the desired portable_data_hash already
459 #    exists at dst, and args.force is False, copy_collection returns
460 #    the existing collection without copying any blocks.  Otherwise
461 #    (if no collection exists or if args.force is True)
462 #    copy_collection copies all of the collection data blocks from src
463 #    to dst.
464 #
465 #    For this application, it is critical to preserve the
466 #    collection's manifest hash, which is not guaranteed with the
467 #    arvados.CollectionReader and arvados.CollectionWriter classes.
468 #    Copying each block in the collection manually, followed by
469 #    the manifest block, ensures that the collection's manifest
470 #    hash will not change.
471 #
472 def copy_collection(obj_uuid, src, dst, args):
473     if arvados.util.keep_locator_pattern.match(obj_uuid):
474         # If the obj_uuid is a portable data hash, it might not be
475         # uniquely identified with a particular collection.  As a
476         # result, it is ambiguous as to what name to use for the copy.
477         # Apply some heuristics to pick which collection to get the
478         # name from.
479         srccol = src.collections().list(
480             filters=[['portable_data_hash', '=', obj_uuid]],
481             order="created_at asc"
482             ).execute(num_retries=args.retries)
483
484         items = srccol.get("items")
485
486         if not items:
487             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
488             return
489
490         c = None
491
492         if len(items) == 1:
493             # There's only one collection with the PDH, so use that.
494             c = items[0]
495         if not c:
496             # See if there is a collection that's in the same project
497             # as the root item (usually a workflow) being copied.
498             for i in items:
499                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
500                     c = i
501                     break
502         if not c:
503             # Didn't find any collections located in the same project, so
504             # pick the oldest collection that has a name assigned to it.
505             for i in items:
506                 if i.get("name"):
507                     c = i
508                     break
509         if not c:
510             # None of the collections have names (?!), so just pick the
511             # first one.
512             c = items[0]
513
514         # list() doesn't return manifest text (and we don't want it to,
515         # because we don't need the same maninfest text sent to us 50
516         # times) so go and retrieve the collection object directly
517         # which will include the manifest text.
518         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
519     else:
520         # Assume this is an actual collection uuid, so fetch it directly.
521         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
522
523     # If a collection with this hash already exists at the
524     # destination, and 'force' is not true, just return that
525     # collection.
526     if not args.force:
527         if 'portable_data_hash' in c:
528             colhash = c['portable_data_hash']
529         else:
530             colhash = c['uuid']
531         dstcol = dst.collections().list(
532             filters=[['portable_data_hash', '=', colhash]]
533         ).execute(num_retries=args.retries)
534         if dstcol['items_available'] > 0:
535             for d in dstcol['items']:
536                 if ((args.project_uuid == d['owner_uuid']) and
537                     (c.get('name') == d['name']) and
538                     (c['portable_data_hash'] == d['portable_data_hash'])):
539                     return d
540             c['manifest_text'] = dst.collections().get(
541                 uuid=dstcol['items'][0]['uuid']
542             ).execute(num_retries=args.retries)['manifest_text']
543             return create_collection_from(c, src, dst, args)
544
545     # Fetch the collection's manifest.
546     manifest = c['manifest_text']
547     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
548
549     # Copy each block from src_keep to dst_keep.
550     # Use the newly signed locators returned from dst_keep to build
551     # a new manifest as we go.
552     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
553     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
554     dst_manifest = io.StringIO()
555     dst_locators = {}
556     bytes_written = 0
557     bytes_expected = total_collection_size(manifest)
558     if args.progress:
559         progress_writer = ProgressWriter(human_progress)
560     else:
561         progress_writer = None
562
563     for line in manifest.splitlines():
564         words = line.split()
565         dst_manifest.write(words[0])
566         for word in words[1:]:
567             try:
568                 loc = arvados.KeepLocator(word)
569             except ValueError:
570                 # If 'word' can't be parsed as a locator,
571                 # presume it's a filename.
572                 dst_manifest.write(' ')
573                 dst_manifest.write(word)
574                 continue
575             blockhash = loc.md5sum
576             # copy this block if we haven't seen it before
577             # (otherwise, just reuse the existing dst_locator)
578             if blockhash not in dst_locators:
579                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
580                 if progress_writer:
581                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
582                 data = src_keep.get(word)
583                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
584                 dst_locators[blockhash] = dst_locator
585                 bytes_written += loc.size
586             dst_manifest.write(' ')
587             dst_manifest.write(dst_locators[blockhash])
588         dst_manifest.write("\n")
589
590     if progress_writer:
591         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
592         progress_writer.finish()
593
594     # Copy the manifest and save the collection.
595     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
596
597     c['manifest_text'] = dst_manifest.getvalue()
598     return create_collection_from(c, src, dst, args)
599
600 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
601     r = api.repositories().list(
602         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
603     if r['items_available'] != 1:
604         raise Exception('cannot identify repo {}; {} repos found'
605                         .format(repo_name, r['items_available']))
606
607     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
608     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
609     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
610
611     priority = https_url + other_url + http_url
612
613     git_config = []
614     git_url = None
615     for url in priority:
616         if url.startswith("http"):
617             u = urllib.parse.urlsplit(url)
618             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
619             git_config = ["-c", "credential.%s/.username=none" % baseurl,
620                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
621         else:
622             git_config = []
623
624         try:
625             logger.debug("trying %s", url)
626             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
627                                       env={"HOME": os.environ["HOME"],
628                                            "ARVADOS_API_TOKEN": api.api_token,
629                                            "GIT_ASKPASS": "/bin/false"})
630         except arvados.errors.CommandFailedError:
631             pass
632         else:
633             git_url = url
634             break
635
636     if not git_url:
637         raise Exception('Cannot access git repository, tried {}'
638                         .format(priority))
639
640     if git_url.startswith("http:"):
641         if allow_insecure_http:
642             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
643         else:
644             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
645
646     return (git_url, git_config)
647
648
649 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
650     """Copy the docker image identified by docker_image and
651     docker_image_tag from src to dst. Create appropriate
652     docker_image_repo+tag and docker_image_hash links at dst.
653
654     """
655
656     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
657
658     # Find the link identifying this docker image.
659     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
660         src, args.retries, docker_image, docker_image_tag)
661     if docker_image_list:
662         image_uuid, image_info = docker_image_list[0]
663         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
664
665         # Copy the collection it refers to.
666         dst_image_col = copy_collection(image_uuid, src, dst, args)
667     elif arvados.util.keep_locator_pattern.match(docker_image):
668         dst_image_col = copy_collection(docker_image, src, dst, args)
669     else:
670         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
671
672 def copy_project(obj_uuid, src, dst, owner_uuid, args):
673
674     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
675
676     # Create/update the destination project
677     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
678                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
679     if len(existing["items"]) == 0:
680         project_record = dst.groups().create(body={"group": {"group_class": "project",
681                                                              "owner_uuid": owner_uuid,
682                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
683     else:
684         project_record = existing["items"][0]
685
686     dst.groups().update(uuid=project_record["uuid"],
687                         body={"group": {
688                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
689
690     args.project_uuid = project_record["uuid"]
691
692     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
693
694
695     partial_error = ""
696
697     # Copy collections
698     try:
699         copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
700                          src, dst, args)
701     except Exception as e:
702         partial_error += "\n" + str(e)
703
704     # Copy workflows
705     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
706         try:
707             copy_workflow(w["uuid"], src, dst, args)
708         except Exception as e:
709             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
710
711     if args.recursive:
712         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
713             try:
714                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
715             except Exception as e:
716                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
717
718     project_record["partial_error"] = partial_error
719
720     return project_record
721
722 # git_rev_parse(rev, repo)
723 #
724 #    Returns the 40-character commit hash corresponding to 'rev' in
725 #    git repository 'repo' (which must be the path of a local git
726 #    repository)
727 #
728 def git_rev_parse(rev, repo):
729     gitout, giterr = arvados.util.run_command(
730         ['git', 'rev-parse', rev], cwd=repo)
731     return gitout.strip()
732
733 # uuid_type(api, object_uuid)
734 #
735 #    Returns the name of the class that object_uuid belongs to, based on
736 #    the second field of the uuid.  This function consults the api's
737 #    schema to identify the object class.
738 #
739 #    It returns a string such as 'Collection', 'Workflow', etc.
740 #
741 #    Special case: if handed a Keep locator hash, return 'Collection'.
742 #
743 def uuid_type(api, object_uuid):
744     if re.match(arvados.util.keep_locator_pattern, object_uuid):
745         return 'Collection'
746     p = object_uuid.split('-')
747     if len(p) == 3:
748         type_prefix = p[1]
749         for k in api._schema.schemas:
750             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
751             if type_prefix == obj_class:
752                 return k
753     return None
754
755 def abort(msg, code=1):
756     logger.info("arv-copy: %s", msg)
757     exit(code)
758
759
760 # Code for reporting on the progress of a collection upload.
761 # Stolen from arvados.commands.put.ArvPutCollectionWriter
762 # TODO(twp): figure out how to refactor into a shared library
763 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
764 # code)
765
766 def machine_progress(obj_uuid, bytes_written, bytes_expected):
767     return "{} {}: {} {} written {} total\n".format(
768         sys.argv[0],
769         os.getpid(),
770         obj_uuid,
771         bytes_written,
772         -1 if (bytes_expected is None) else bytes_expected)
773
774 def human_progress(obj_uuid, bytes_written, bytes_expected):
775     if bytes_expected:
776         return "\r{}: {}M / {}M {:.1%} ".format(
777             obj_uuid,
778             bytes_written >> 20, bytes_expected >> 20,
779             float(bytes_written) / bytes_expected)
780     else:
781         return "\r{}: {} ".format(obj_uuid, bytes_written)
782
783 class ProgressWriter(object):
784     _progress_func = None
785     outfile = sys.stderr
786
787     def __init__(self, progress_func):
788         self._progress_func = progress_func
789
790     def report(self, obj_uuid, bytes_written, bytes_expected):
791         if self._progress_func is not None:
792             self.outfile.write(
793                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
794
795     def finish(self):
796         self.outfile.write("\n")
797
798 if __name__ == '__main__':
799     main()