Merge branch '20229-doc-inspect-requests'
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37 import io
38
39 import arvados
40 import arvados.config
41 import arvados.keep
42 import arvados.util
43 import arvados.commands._util as arv_cmd
44 import arvados.commands.keepdocker
45 import ruamel.yaml as yaml
46
47 from arvados.api import OrderedJsonModel
48 from arvados._version import __version__
49
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51
52 logger = logging.getLogger('arvados.arv-copy')
53
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
56 # directories.
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 #
60 local_repo_dir = {}
61
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
65
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
68
69 # The owner_uuid of the object being copied
70 src_owner_uuid = None
71
72 def main():
73     copy_opts = argparse.ArgumentParser(add_help=False)
74
75     copy_opts.add_argument(
76         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77         help='Print version and exit.')
78     copy_opts.add_argument(
79         '-v', '--verbose', dest='verbose', action='store_true',
80         help='Verbose output.')
81     copy_opts.add_argument(
82         '--progress', dest='progress', action='store_true',
83         help='Report progress on copying collections. (default)')
84     copy_opts.add_argument(
85         '--no-progress', dest='progress', action='store_false',
86         help='Do not report progress on copying collections.')
87     copy_opts.add_argument(
88         '-f', '--force', dest='force', action='store_true',
89         help='Perform copy even if the object appears to exist at the remote destination.')
90     copy_opts.add_argument(
91         '--src', dest='source_arvados',
92         help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados',
95         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object, and subprojects. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies or subprojects.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105     copy_opts.add_argument(
106         '--storage-classes', dest='storage_classes',
107         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.storage_classes:
121         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
122
123     if args.verbose:
124         logger.setLevel(logging.DEBUG)
125     else:
126         logger.setLevel(logging.INFO)
127
128     if not args.source_arvados:
129         args.source_arvados = args.object_uuid[:5]
130
131     # Create API clients for the source and destination instances
132     src_arv = api_for_instance(args.source_arvados, args.retries)
133     dst_arv = api_for_instance(args.destination_arvados, args.retries)
134
135     if not args.project_uuid:
136         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137
138     # Identify the kind of object we have been given, and begin copying.
139     t = uuid_type(src_arv, args.object_uuid)
140     if t == 'Collection':
141         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142         result = copy_collection(args.object_uuid,
143                                  src_arv, dst_arv,
144                                  args)
145     elif t == 'Workflow':
146         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
147         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
148     elif t == 'Group':
149         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
150         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in listvalues(local_repo_dir):
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     print(result['uuid'])
165
166     if result.get('partial_error'):
167         logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
168         exit(1)
169
170     logger.info("Success: created copy with uuid {}".format(result['uuid']))
171     exit(0)
172
173 def set_src_owner_uuid(resource, uuid, args):
174     global src_owner_uuid
175     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
176     src_owner_uuid = c.get("owner_uuid")
177
178 # api_for_instance(instance_name)
179 #
180 #     Creates an API client for the Arvados instance identified by
181 #     instance_name.
182 #
183 #     If instance_name contains a slash, it is presumed to be a path
184 #     (either local or absolute) to a file with Arvados configuration
185 #     settings.
186 #
187 #     Otherwise, it is presumed to be the name of a file in
188 #     $HOME/.config/arvados/instance_name.conf
189 #
190 def api_for_instance(instance_name, num_retries):
191     if not instance_name:
192         # Use environment
193         return arvados.api('v1', model=OrderedJsonModel())
194
195     if '/' in instance_name:
196         config_file = instance_name
197     else:
198         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
199
200     try:
201         cfg = arvados.config.load(config_file)
202     except (IOError, OSError) as e:
203         abort(("Could not open config file {}: {}\n" +
204                "You must make sure that your configuration tokens\n" +
205                "for Arvados instance {} are in {} and that this\n" +
206                "file is readable.").format(
207                    config_file, e, instance_name, config_file))
208
209     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
210         api_is_insecure = (
211             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
212                 ['1', 't', 'true', 'y', 'yes']))
213         client = arvados.api('v1',
214                              host=cfg['ARVADOS_API_HOST'],
215                              token=cfg['ARVADOS_API_TOKEN'],
216                              insecure=api_is_insecure,
217                              model=OrderedJsonModel(),
218                              num_retries=num_retries,
219                              )
220     else:
221         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
222     return client
223
224 # Check if git is available
225 def check_git_availability():
226     try:
227         arvados.util.run_command(['git', '--help'])
228     except Exception:
229         abort('git command is not available. Please ensure git is installed.')
230
231
232 def filter_iter(arg):
233     """Iterate a filter string-or-list.
234
235     Pass in a filter field that can either be a string or list.
236     This will iterate elements as if the field had been written as a list.
237     """
238     if isinstance(arg, basestring):
239         return iter((arg,))
240     else:
241         return iter(arg)
242
243 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
244     """Update a single repository filter in-place for the destination.
245
246     If the filter checks that the repository is src_repository, it is
247     updated to check that the repository is dst_repository.  If it does
248     anything else, this function raises ValueError.
249     """
250     if src_repository is None:
251         raise ValueError("component does not specify a source repository")
252     elif dst_repository is None:
253         raise ValueError("no destination repository specified to update repository filter")
254     elif repo_filter[1:] == ['=', src_repository]:
255         repo_filter[2] = dst_repository
256     elif repo_filter[1:] == ['in', [src_repository]]:
257         repo_filter[2] = [dst_repository]
258     else:
259         raise ValueError("repository filter is not a simple source match")
260
261 def migrate_script_version_filter(version_filter):
262     """Update a single script_version filter in-place for the destination.
263
264     Currently this function checks that all the filter operands are Git
265     commit hashes.  If they're not, it raises ValueError to indicate that
266     the filter is not portable.  It could be extended to make other
267     transformations in the future.
268     """
269     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
270         raise ValueError("script_version filter is not limited to commit hashes")
271
272 def attr_filtered(filter_, *attr_names):
273     """Return True if filter_ applies to any of attr_names, else False."""
274     return any((name == 'any') or (name in attr_names)
275                for name in filter_iter(filter_[0]))
276
277 @contextlib.contextmanager
278 def exception_handler(handler, *exc_types):
279     """If any exc_types are raised in the block, call handler on the exception."""
280     try:
281         yield
282     except exc_types as error:
283         handler(error)
284
285
286 # copy_workflow(wf_uuid, src, dst, args)
287 #
288 #    Copies a workflow identified by wf_uuid from src to dst.
289 #
290 #    If args.recursive is True, also copy any collections
291 #      referenced in the workflow definition yaml.
292 #
293 #    The owner_uuid of the new workflow is set to any given
294 #      project_uuid or the user who copied the template.
295 #
296 #    Returns the copied workflow object.
297 #
298 def copy_workflow(wf_uuid, src, dst, args):
299     # fetch the workflow from the source instance
300     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
301
302     if not wf["definition"]:
303         logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
304
305     # copy collections and docker images
306     if args.recursive and wf["definition"]:
307         wf_def = yaml.safe_load(wf["definition"])
308         if wf_def is not None:
309             locations = []
310             docker_images = {}
311             graph = wf_def.get('$graph', None)
312             if graph is not None:
313                 workflow_collections(graph, locations, docker_images)
314             else:
315                 workflow_collections(wf_def, locations, docker_images)
316
317             if locations:
318                 copy_collections(locations, src, dst, args)
319
320             for image in docker_images:
321                 copy_docker_image(image, docker_images[image], src, dst, args)
322
323     # copy the workflow itself
324     del wf['uuid']
325     wf['owner_uuid'] = args.project_uuid
326
327     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
328                                              ["name", "=", wf["name"]]]).execute()
329     if len(existing["items"]) == 0:
330         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
331     else:
332         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
333
334
335 def workflow_collections(obj, locations, docker_images):
336     if isinstance(obj, dict):
337         loc = obj.get('location', None)
338         if loc is not None:
339             if loc.startswith("keep:"):
340                 locations.append(loc[5:])
341
342         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
343         if docker_image is not None:
344             ds = docker_image.split(":", 1)
345             tag = ds[1] if len(ds)==2 else 'latest'
346             docker_images[ds[0]] = tag
347
348         for x in obj:
349             workflow_collections(obj[x], locations, docker_images)
350     elif isinstance(obj, list):
351         for x in obj:
352             workflow_collections(x, locations, docker_images)
353
354 # copy_collections(obj, src, dst, args)
355 #
356 #    Recursively copies all collections referenced by 'obj' from src
357 #    to dst.  obj may be a dict or a list, in which case we run
358 #    copy_collections on every value it contains. If it is a string,
359 #    search it for any substring that matches a collection hash or uuid
360 #    (this will find hidden references to collections like
361 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
362 #
363 #    Returns a copy of obj with any old collection uuids replaced by
364 #    the new ones.
365 #
366 def copy_collections(obj, src, dst, args):
367
368     def copy_collection_fn(collection_match):
369         """Helper function for regex substitution: copies a single collection,
370         identified by the collection_match MatchObject, to the
371         destination.  Returns the destination collection uuid (or the
372         portable data hash if that's what src_id is).
373
374         """
375         src_id = collection_match.group(0)
376         if src_id not in collections_copied:
377             dst_col = copy_collection(src_id, src, dst, args)
378             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
379                 collections_copied[src_id] = src_id
380             else:
381                 collections_copied[src_id] = dst_col['uuid']
382         return collections_copied[src_id]
383
384     if isinstance(obj, basestring):
385         # Copy any collections identified in this string to dst, replacing
386         # them with the dst uuids as necessary.
387         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
388         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
389         return obj
390     elif isinstance(obj, dict):
391         return type(obj)((v, copy_collections(obj[v], src, dst, args))
392                          for v in obj)
393     elif isinstance(obj, list):
394         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
395     return obj
396
397
398 def total_collection_size(manifest_text):
399     """Return the total number of bytes in this collection (excluding
400     duplicate blocks)."""
401
402     total_bytes = 0
403     locators_seen = {}
404     for line in manifest_text.splitlines():
405         words = line.split()
406         for word in words[1:]:
407             try:
408                 loc = arvados.KeepLocator(word)
409             except ValueError:
410                 continue  # this word isn't a locator, skip it
411             if loc.md5sum not in locators_seen:
412                 locators_seen[loc.md5sum] = True
413                 total_bytes += loc.size
414
415     return total_bytes
416
417 def create_collection_from(c, src, dst, args):
418     """Create a new collection record on dst, and copy Docker metadata if
419     available."""
420
421     collection_uuid = c['uuid']
422     body = {}
423     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
424         body[d] = c[d]
425
426     if not body["name"]:
427         body['name'] = "copied from " + collection_uuid
428
429     if args.storage_classes:
430         body['storage_classes_desired'] = args.storage_classes
431
432     body['owner_uuid'] = args.project_uuid
433
434     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
435
436     # Create docker_image_repo+tag and docker_image_hash links
437     # at the destination.
438     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
439         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
440
441         for src_link in docker_links:
442             body = {key: src_link[key]
443                     for key in ['link_class', 'name', 'properties']}
444             body['head_uuid'] = dst_collection['uuid']
445             body['owner_uuid'] = args.project_uuid
446
447             lk = dst.links().create(body=body).execute(num_retries=args.retries)
448             logger.debug('created dst link {}'.format(lk))
449
450     return dst_collection
451
452 # copy_collection(obj_uuid, src, dst, args)
453 #
454 #    Copies the collection identified by obj_uuid from src to dst.
455 #    Returns the collection object created at dst.
456 #
457 #    If args.progress is True, produce a human-friendly progress
458 #    report.
459 #
460 #    If a collection with the desired portable_data_hash already
461 #    exists at dst, and args.force is False, copy_collection returns
462 #    the existing collection without copying any blocks.  Otherwise
463 #    (if no collection exists or if args.force is True)
464 #    copy_collection copies all of the collection data blocks from src
465 #    to dst.
466 #
467 #    For this application, it is critical to preserve the
468 #    collection's manifest hash, which is not guaranteed with the
469 #    arvados.CollectionReader and arvados.CollectionWriter classes.
470 #    Copying each block in the collection manually, followed by
471 #    the manifest block, ensures that the collection's manifest
472 #    hash will not change.
473 #
474 def copy_collection(obj_uuid, src, dst, args):
475     if arvados.util.keep_locator_pattern.match(obj_uuid):
476         # If the obj_uuid is a portable data hash, it might not be
477         # uniquely identified with a particular collection.  As a
478         # result, it is ambiguous as to what name to use for the copy.
479         # Apply some heuristics to pick which collection to get the
480         # name from.
481         srccol = src.collections().list(
482             filters=[['portable_data_hash', '=', obj_uuid]],
483             order="created_at asc"
484             ).execute(num_retries=args.retries)
485
486         items = srccol.get("items")
487
488         if not items:
489             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
490             return
491
492         c = None
493
494         if len(items) == 1:
495             # There's only one collection with the PDH, so use that.
496             c = items[0]
497         if not c:
498             # See if there is a collection that's in the same project
499             # as the root item (usually a workflow) being copied.
500             for i in items:
501                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
502                     c = i
503                     break
504         if not c:
505             # Didn't find any collections located in the same project, so
506             # pick the oldest collection that has a name assigned to it.
507             for i in items:
508                 if i.get("name"):
509                     c = i
510                     break
511         if not c:
512             # None of the collections have names (?!), so just pick the
513             # first one.
514             c = items[0]
515
516         # list() doesn't return manifest text (and we don't want it to,
517         # because we don't need the same maninfest text sent to us 50
518         # times) so go and retrieve the collection object directly
519         # which will include the manifest text.
520         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
521     else:
522         # Assume this is an actual collection uuid, so fetch it directly.
523         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
524
525     # If a collection with this hash already exists at the
526     # destination, and 'force' is not true, just return that
527     # collection.
528     if not args.force:
529         if 'portable_data_hash' in c:
530             colhash = c['portable_data_hash']
531         else:
532             colhash = c['uuid']
533         dstcol = dst.collections().list(
534             filters=[['portable_data_hash', '=', colhash]]
535         ).execute(num_retries=args.retries)
536         if dstcol['items_available'] > 0:
537             for d in dstcol['items']:
538                 if ((args.project_uuid == d['owner_uuid']) and
539                     (c.get('name') == d['name']) and
540                     (c['portable_data_hash'] == d['portable_data_hash'])):
541                     return d
542             c['manifest_text'] = dst.collections().get(
543                 uuid=dstcol['items'][0]['uuid']
544             ).execute(num_retries=args.retries)['manifest_text']
545             return create_collection_from(c, src, dst, args)
546
547     # Fetch the collection's manifest.
548     manifest = c['manifest_text']
549     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
550
551     # Copy each block from src_keep to dst_keep.
552     # Use the newly signed locators returned from dst_keep to build
553     # a new manifest as we go.
554     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
555     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
556     dst_manifest = io.StringIO()
557     dst_locators = {}
558     bytes_written = 0
559     bytes_expected = total_collection_size(manifest)
560     if args.progress:
561         progress_writer = ProgressWriter(human_progress)
562     else:
563         progress_writer = None
564
565     for line in manifest.splitlines():
566         words = line.split()
567         dst_manifest.write(words[0])
568         for word in words[1:]:
569             try:
570                 loc = arvados.KeepLocator(word)
571             except ValueError:
572                 # If 'word' can't be parsed as a locator,
573                 # presume it's a filename.
574                 dst_manifest.write(' ')
575                 dst_manifest.write(word)
576                 continue
577             blockhash = loc.md5sum
578             # copy this block if we haven't seen it before
579             # (otherwise, just reuse the existing dst_locator)
580             if blockhash not in dst_locators:
581                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
582                 if progress_writer:
583                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
584                 data = src_keep.get(word)
585                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
586                 dst_locators[blockhash] = dst_locator
587                 bytes_written += loc.size
588             dst_manifest.write(' ')
589             dst_manifest.write(dst_locators[blockhash])
590         dst_manifest.write("\n")
591
592     if progress_writer:
593         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
594         progress_writer.finish()
595
596     # Copy the manifest and save the collection.
597     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
598
599     c['manifest_text'] = dst_manifest.getvalue()
600     return create_collection_from(c, src, dst, args)
601
602 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
603     r = api.repositories().list(
604         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
605     if r['items_available'] != 1:
606         raise Exception('cannot identify repo {}; {} repos found'
607                         .format(repo_name, r['items_available']))
608
609     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
610     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
611     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
612
613     priority = https_url + other_url + http_url
614
615     git_config = []
616     git_url = None
617     for url in priority:
618         if url.startswith("http"):
619             u = urllib.parse.urlsplit(url)
620             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
621             git_config = ["-c", "credential.%s/.username=none" % baseurl,
622                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
623         else:
624             git_config = []
625
626         try:
627             logger.debug("trying %s", url)
628             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
629                                       env={"HOME": os.environ["HOME"],
630                                            "ARVADOS_API_TOKEN": api.api_token,
631                                            "GIT_ASKPASS": "/bin/false"})
632         except arvados.errors.CommandFailedError:
633             pass
634         else:
635             git_url = url
636             break
637
638     if not git_url:
639         raise Exception('Cannot access git repository, tried {}'
640                         .format(priority))
641
642     if git_url.startswith("http:"):
643         if allow_insecure_http:
644             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
645         else:
646             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
647
648     return (git_url, git_config)
649
650
651 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
652     """Copy the docker image identified by docker_image and
653     docker_image_tag from src to dst. Create appropriate
654     docker_image_repo+tag and docker_image_hash links at dst.
655
656     """
657
658     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
659
660     # Find the link identifying this docker image.
661     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
662         src, args.retries, docker_image, docker_image_tag)
663     if docker_image_list:
664         image_uuid, image_info = docker_image_list[0]
665         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
666
667         # Copy the collection it refers to.
668         dst_image_col = copy_collection(image_uuid, src, dst, args)
669     elif arvados.util.keep_locator_pattern.match(docker_image):
670         dst_image_col = copy_collection(docker_image, src, dst, args)
671     else:
672         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
673
674 def copy_project(obj_uuid, src, dst, owner_uuid, args):
675
676     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
677
678     # Create/update the destination project
679     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
680                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
681     if len(existing["items"]) == 0:
682         project_record = dst.groups().create(body={"group": {"group_class": "project",
683                                                              "owner_uuid": owner_uuid,
684                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
685     else:
686         project_record = existing["items"][0]
687
688     dst.groups().update(uuid=project_record["uuid"],
689                         body={"group": {
690                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
691
692     args.project_uuid = project_record["uuid"]
693
694     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
695
696
697     partial_error = ""
698
699     # Copy collections
700     try:
701         copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
702                          src, dst, args)
703     except Exception as e:
704         partial_error += "\n" + str(e)
705
706     # Copy workflows
707     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
708         try:
709             copy_workflow(w["uuid"], src, dst, args)
710         except Exception as e:
711             partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
712
713     if args.recursive:
714         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
715             try:
716                 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
717             except Exception as e:
718                 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
719
720     project_record["partial_error"] = partial_error
721
722     return project_record
723
724 # git_rev_parse(rev, repo)
725 #
726 #    Returns the 40-character commit hash corresponding to 'rev' in
727 #    git repository 'repo' (which must be the path of a local git
728 #    repository)
729 #
730 def git_rev_parse(rev, repo):
731     gitout, giterr = arvados.util.run_command(
732         ['git', 'rev-parse', rev], cwd=repo)
733     return gitout.strip()
734
735 # uuid_type(api, object_uuid)
736 #
737 #    Returns the name of the class that object_uuid belongs to, based on
738 #    the second field of the uuid.  This function consults the api's
739 #    schema to identify the object class.
740 #
741 #    It returns a string such as 'Collection', 'Workflow', etc.
742 #
743 #    Special case: if handed a Keep locator hash, return 'Collection'.
744 #
745 def uuid_type(api, object_uuid):
746     if re.match(arvados.util.keep_locator_pattern, object_uuid):
747         return 'Collection'
748     p = object_uuid.split('-')
749     if len(p) == 3:
750         type_prefix = p[1]
751         for k in api._schema.schemas:
752             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
753             if type_prefix == obj_class:
754                 return k
755     return None
756
757 def abort(msg, code=1):
758     logger.info("arv-copy: %s", msg)
759     exit(code)
760
761
762 # Code for reporting on the progress of a collection upload.
763 # Stolen from arvados.commands.put.ArvPutCollectionWriter
764 # TODO(twp): figure out how to refactor into a shared library
765 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
766 # code)
767
768 def machine_progress(obj_uuid, bytes_written, bytes_expected):
769     return "{} {}: {} {} written {} total\n".format(
770         sys.argv[0],
771         os.getpid(),
772         obj_uuid,
773         bytes_written,
774         -1 if (bytes_expected is None) else bytes_expected)
775
776 def human_progress(obj_uuid, bytes_written, bytes_expected):
777     if bytes_expected:
778         return "\r{}: {}M / {}M {:.1%} ".format(
779             obj_uuid,
780             bytes_written >> 20, bytes_expected >> 20,
781             float(bytes_written) / bytes_expected)
782     else:
783         return "\r{}: {} ".format(obj_uuid, bytes_written)
784
785 class ProgressWriter(object):
786     _progress_func = None
787     outfile = sys.stderr
788
789     def __init__(self, progress_func):
790         self._progress_func = progress_func
791
792     def report(self, obj_uuid, bytes_written, bytes_expected):
793         if self._progress_func is not None:
794             self.outfile.write(
795                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
796
797     def finish(self):
798         self.outfile.write("\n")
799
800 if __name__ == '__main__':
801     main()