6605: Use git -c instead of "git config" to set credentials. Refuse to use
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27 import urlparse
28
29 import arvados
30 import arvados.config
31 import arvados.keep
32 import arvados.util
33 import arvados.commands._util as arv_cmd
34 import arvados.commands.keepdocker
35
36 from arvados.api import OrderedJsonModel
37
38 logger = logging.getLogger('arvados.arv-copy')
39
40 # local_repo_dir records which git repositories from the Arvados source
41 # instance have been checked out locally during this run, and to which
42 # directories.
43 # e.g. if repository 'twp' from src_arv has been cloned into
44 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
45 #
46 local_repo_dir = {}
47
48 # List of collections that have been copied in this session, and their
49 # destination collection UUIDs.
50 collections_copied = {}
51
52 # Set of (repository, script_version) two-tuples of commits copied in git.
53 scripts_copied = set()
54
55 # The owner_uuid of the object being copied
56 src_owner_uuid = None
57
58 def main():
59     copy_opts = argparse.ArgumentParser(add_help=False)
60
61     copy_opts.add_argument(
62         '-v', '--verbose', dest='verbose', action='store_true',
63         help='Verbose output.')
64     copy_opts.add_argument(
65         '--progress', dest='progress', action='store_true',
66         help='Report progress on copying collections. (default)')
67     copy_opts.add_argument(
68         '--no-progress', dest='progress', action='store_false',
69         help='Do not report progress on copying collections.')
70     copy_opts.add_argument(
71         '-f', '--force', dest='force', action='store_true',
72         help='Perform copy even if the object appears to exist at the remote destination.')
73     copy_opts.add_argument(
74         '--src', dest='source_arvados', required=True,
75         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
76     copy_opts.add_argument(
77         '--dst', dest='destination_arvados', required=True,
78         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
79     copy_opts.add_argument(
80         '--recursive', dest='recursive', action='store_true',
81         help='Recursively copy any dependencies for this object. (default)')
82     copy_opts.add_argument(
83         '--no-recursive', dest='recursive', action='store_false',
84         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
85     copy_opts.add_argument(
86         '--dst-git-repo', dest='dst_git_repo',
87         help='The name of the destination git repository. Required when copying a pipeline recursively.')
88     copy_opts.add_argument(
89         '--project-uuid', dest='project_uuid',
90         help='The UUID of the project at the destination to which the pipeline should be copied.')
91     copy_opts.add_argument(
92         'object_uuid',
93         help='The UUID of the object to be copied.')
94     copy_opts.set_defaults(progress=True)
95     copy_opts.set_defaults(recursive=True)
96
97     parser = argparse.ArgumentParser(
98         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
99         parents=[copy_opts, arv_cmd.retry_opt])
100     args = parser.parse_args()
101
102     if args.verbose:
103         logger.setLevel(logging.DEBUG)
104     else:
105         logger.setLevel(logging.INFO)
106
107     # Create API clients for the source and destination instances
108     src_arv = api_for_instance(args.source_arvados)
109     dst_arv = api_for_instance(args.destination_arvados)
110
111     if not args.project_uuid:
112         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
113
114     # Identify the kind of object we have been given, and begin copying.
115     t = uuid_type(src_arv, args.object_uuid)
116     if t == 'Collection':
117         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
118         result = copy_collection(args.object_uuid,
119                                  src_arv, dst_arv,
120                                  args)
121     elif t == 'PipelineInstance':
122         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
123         result = copy_pipeline_instance(args.object_uuid,
124                                         src_arv, dst_arv,
125                                         args)
126     elif t == 'PipelineTemplate':
127         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
128         result = copy_pipeline_template(args.object_uuid,
129                                         src_arv, dst_arv, args)
130     else:
131         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
132
133     # Clean up any outstanding temp git repositories.
134     for d in local_repo_dir.values():
135         shutil.rmtree(d, ignore_errors=True)
136
137     # If no exception was thrown and the response does not have an
138     # error_token field, presume success
139     if 'error_token' in result or 'uuid' not in result:
140         logger.error("API server returned an error result: {}".format(result))
141         exit(1)
142
143     logger.info("")
144     logger.info("Success: created copy with uuid {}".format(result['uuid']))
145     exit(0)
146
147 def set_src_owner_uuid(resource, uuid, args):
148     global src_owner_uuid
149     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
150     src_owner_uuid = c.get("owner_uuid")
151
152 # api_for_instance(instance_name)
153 #
154 #     Creates an API client for the Arvados instance identified by
155 #     instance_name.
156 #
157 #     If instance_name contains a slash, it is presumed to be a path
158 #     (either local or absolute) to a file with Arvados configuration
159 #     settings.
160 #
161 #     Otherwise, it is presumed to be the name of a file in
162 #     $HOME/.config/arvados/instance_name.conf
163 #
164 def api_for_instance(instance_name):
165     if '/' in instance_name:
166         config_file = instance_name
167     else:
168         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
169
170     try:
171         cfg = arvados.config.load(config_file)
172     except (IOError, OSError) as e:
173         abort(("Could not open config file {}: {}\n" +
174                "You must make sure that your configuration tokens\n" +
175                "for Arvados instance {} are in {} and that this\n" +
176                "file is readable.").format(
177                    config_file, e, instance_name, config_file))
178
179     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
180         api_is_insecure = (
181             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
182                 ['1', 't', 'true', 'y', 'yes']))
183         client = arvados.api('v1',
184                              host=cfg['ARVADOS_API_HOST'],
185                              token=cfg['ARVADOS_API_TOKEN'],
186                              insecure=api_is_insecure,
187                              model=OrderedJsonModel())
188     else:
189         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
190     return client
191
192 # Check if git is available
193 def check_git_availability():
194     try:
195         arvados.util.run_command(['git', '--help'])
196     except Exception:
197         abort('git command is not available. Please ensure git is installed.')
198
199 # copy_pipeline_instance(pi_uuid, src, dst, args)
200 #
201 #    Copies a pipeline instance identified by pi_uuid from src to dst.
202 #
203 #    If the args.recursive option is set:
204 #      1. Copies all input collections
205 #           * For each component in the pipeline, include all collections
206 #             listed as job dependencies for that component)
207 #      2. Copy docker images
208 #      3. Copy git repositories
209 #      4. Copy the pipeline template
210 #
211 #    The only changes made to the copied pipeline instance are:
212 #      1. The original pipeline instance UUID is preserved in
213 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
214 #      2. The pipeline_template_uuid is changed to the new template uuid.
215 #      3. The owner_uuid of the instance is changed to the user who
216 #         copied it.
217 #
218 def copy_pipeline_instance(pi_uuid, src, dst, args):
219     # Fetch the pipeline instance record.
220     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
221
222     if args.recursive:
223         check_git_availability()
224
225         if not args.dst_git_repo:
226             abort('--dst-git-repo is required when copying a pipeline recursively.')
227         # Copy the pipeline template and save the copied template.
228         if pi.get('pipeline_template_uuid', None):
229             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
230                                         src, dst, args)
231
232         # Copy input collections, docker images and git repos.
233         pi = copy_collections(pi, src, dst, args)
234         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
235         copy_docker_images(pi, src, dst, args)
236
237         # Update the fields of the pipeline instance with the copied
238         # pipeline template.
239         if pi.get('pipeline_template_uuid', None):
240             pi['pipeline_template_uuid'] = pt['uuid']
241
242     else:
243         # not recursive
244         logger.info("Copying only pipeline instance %s.", pi_uuid)
245         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
246
247     # Update the pipeline instance properties, and create the new
248     # instance at dst.
249     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
250     pi['description'] = "Pipeline copied from {}\n\n{}".format(
251         pi_uuid,
252         pi['description'] if pi.get('description', None) else '')
253
254     pi['owner_uuid'] = args.project_uuid
255
256     del pi['uuid']
257
258     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
259     return new_pi
260
261 # copy_pipeline_template(pt_uuid, src, dst, args)
262 #
263 #    Copies a pipeline template identified by pt_uuid from src to dst.
264 #
265 #    If args.recursive is True, also copy any collections, docker
266 #    images and git repositories that this template references.
267 #
268 #    The owner_uuid of the new template is changed to that of the user
269 #    who copied the template.
270 #
271 #    Returns the copied pipeline template object.
272 #
273 def copy_pipeline_template(pt_uuid, src, dst, args):
274     # fetch the pipeline template from the source instance
275     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
276
277     if args.recursive:
278         check_git_availability()
279
280         if not args.dst_git_repo:
281             abort('--dst-git-repo is required when copying a pipeline recursively.')
282         # Copy input collections, docker images and git repos.
283         pt = copy_collections(pt, src, dst, args)
284         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
285         copy_docker_images(pt, src, dst, args)
286
287     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
288         pt_uuid,
289         pt['description'] if pt.get('description', None) else '')
290     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
291     del pt['uuid']
292
293     pt['owner_uuid'] = args.project_uuid
294
295     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
296
297 # copy_collections(obj, src, dst, args)
298 #
299 #    Recursively copies all collections referenced by 'obj' from src
300 #    to dst.  obj may be a dict or a list, in which case we run
301 #    copy_collections on every value it contains. If it is a string,
302 #    search it for any substring that matches a collection hash or uuid
303 #    (this will find hidden references to collections like
304 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
305 #
306 #    Returns a copy of obj with any old collection uuids replaced by
307 #    the new ones.
308 #
309 def copy_collections(obj, src, dst, args):
310
311     def copy_collection_fn(collection_match):
312         """Helper function for regex substitution: copies a single collection,
313         identified by the collection_match MatchObject, to the
314         destination.  Returns the destination collection uuid (or the
315         portable data hash if that's what src_id is).
316
317         """
318         src_id = collection_match.group(0)
319         if src_id not in collections_copied:
320             dst_col = copy_collection(src_id, src, dst, args)
321             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
322                 collections_copied[src_id] = src_id
323             else:
324                 collections_copied[src_id] = dst_col['uuid']
325         return collections_copied[src_id]
326
327     if isinstance(obj, basestring):
328         # Copy any collections identified in this string to dst, replacing
329         # them with the dst uuids as necessary.
330         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
331         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
332         return obj
333     elif isinstance(obj, dict):
334         return type(obj)((v, copy_collections(obj[v], src, dst, args))
335                          for v in obj)
336     elif isinstance(obj, list):
337         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
338     return obj
339
340 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
341     """Copy a job's script to the destination repository, and update its record.
342
343     Given a jobspec dictionary, this function finds the referenced script from
344     src and copies it to dst and dst_repo.  It also updates jobspec in place to
345     refer to names on the destination.
346     """
347     repo = jobspec.get('repository')
348     if repo is None:
349         return
350     # script_version is the "script_version" parameter from the source
351     # component or job.  If no script_version was supplied in the
352     # component or job, it is a mistake in the pipeline, but for the
353     # purposes of copying the repository, default to "master".
354     script_version = jobspec.get('script_version') or 'master'
355     script_key = (repo, script_version)
356     if script_key not in scripts_copied:
357         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
358         scripts_copied.add(script_key)
359     jobspec['repository'] = dst_repo
360     repo_dir = local_repo_dir[repo]
361     for version_key in ['script_version', 'supplied_script_version']:
362         if version_key in jobspec:
363             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
364
365 # copy_git_repos(p, src, dst, dst_repo, args)
366 #
367 #    Copies all git repositories referenced by pipeline instance or
368 #    template 'p' from src to dst.
369 #
370 #    For each component c in the pipeline:
371 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
372 #      * Rename script versions:
373 #          * c['script_version']
374 #          * c['job']['script_version']
375 #          * c['job']['supplied_script_version']
376 #        to the commit hashes they resolve to, since any symbolic
377 #        names (tags, branches) are not preserved in the destination repo.
378 #
379 #    The pipeline object is updated in place with the new repository
380 #    names.  The return value is undefined.
381 #
382 def copy_git_repos(p, src, dst, dst_repo, args):
383     for component in p['components'].itervalues():
384         migrate_jobspec(component, src, dst, dst_repo, args)
385         if 'job' in component:
386             migrate_jobspec(component['job'], src, dst, dst_repo, args)
387
388 def total_collection_size(manifest_text):
389     """Return the total number of bytes in this collection (excluding
390     duplicate blocks)."""
391
392     total_bytes = 0
393     locators_seen = {}
394     for line in manifest_text.splitlines():
395         words = line.split()
396         for word in words[1:]:
397             try:
398                 loc = arvados.KeepLocator(word)
399             except ValueError:
400                 continue  # this word isn't a locator, skip it
401             if loc.md5sum not in locators_seen:
402                 locators_seen[loc.md5sum] = True
403                 total_bytes += loc.size
404
405     return total_bytes
406
407 def create_collection_from(c, src, dst, args):
408     """Create a new collection record on dst, and copy Docker metadata if
409     available."""
410
411     collection_uuid = c['uuid']
412     del c['uuid']
413
414     if not c["name"]:
415         c['name'] = "copied from " + collection_uuid
416
417     if 'properties' in c:
418         del c['properties']
419
420     c['owner_uuid'] = args.project_uuid
421
422     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
423
424     # Create docker_image_repo+tag and docker_image_hash links
425     # at the destination.
426     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
427         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
428
429         for src_link in docker_links:
430             body = {key: src_link[key]
431                     for key in ['link_class', 'name', 'properties']}
432             body['head_uuid'] = dst_collection['uuid']
433             body['owner_uuid'] = args.project_uuid
434
435             lk = dst.links().create(body=body).execute(num_retries=args.retries)
436             logger.debug('created dst link {}'.format(lk))
437
438     return dst_collection
439
440 # copy_collection(obj_uuid, src, dst, args)
441 #
442 #    Copies the collection identified by obj_uuid from src to dst.
443 #    Returns the collection object created at dst.
444 #
445 #    If args.progress is True, produce a human-friendly progress
446 #    report.
447 #
448 #    If a collection with the desired portable_data_hash already
449 #    exists at dst, and args.force is False, copy_collection returns
450 #    the existing collection without copying any blocks.  Otherwise
451 #    (if no collection exists or if args.force is True)
452 #    copy_collection copies all of the collection data blocks from src
453 #    to dst.
454 #
455 #    For this application, it is critical to preserve the
456 #    collection's manifest hash, which is not guaranteed with the
457 #    arvados.CollectionReader and arvados.CollectionWriter classes.
458 #    Copying each block in the collection manually, followed by
459 #    the manifest block, ensures that the collection's manifest
460 #    hash will not change.
461 #
462 def copy_collection(obj_uuid, src, dst, args):
463     if arvados.util.keep_locator_pattern.match(obj_uuid):
464         # If the obj_uuid is a portable data hash, it might not be uniquely
465         # identified with a particular collection.  As a result, it is
466         # ambigious as to what name to use for the copy.  Apply some heuristics
467         # to pick which collection to get the name from.
468         srccol = src.collections().list(
469             filters=[['portable_data_hash', '=', obj_uuid]],
470             order="created_at asc"
471             ).execute(num_retries=args.retries)
472
473         items = srccol.get("items")
474
475         if not items:
476             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
477             return
478
479         c = None
480
481         if len(items) == 1:
482             # There's only one collection with the PDH, so use that.
483             c = items[0]
484         if not c:
485             # See if there is a collection that's in the same project
486             # as the root item (usually a pipeline) being copied.
487             for i in items:
488                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
489                     c = i
490                     break
491         if not c:
492             # Didn't find any collections located in the same project, so
493             # pick the oldest collection that has a name assigned to it.
494             for i in items:
495                 if i.get("name"):
496                     c = i
497                     break
498         if not c:
499             # None of the collections have names (?!), so just pick the
500             # first one.
501             c = items[0]
502
503         # list() doesn't return manifest text (and we don't want it to,
504         # because we don't need the same maninfest text sent to us 50
505         # times) so go and retrieve the collection object directly
506         # which will include the manifest text.
507         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
508     else:
509         # Assume this is an actual collection uuid, so fetch it directly.
510         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
511
512     # If a collection with this hash already exists at the
513     # destination, and 'force' is not true, just return that
514     # collection.
515     if not args.force:
516         if 'portable_data_hash' in c:
517             colhash = c['portable_data_hash']
518         else:
519             colhash = c['uuid']
520         dstcol = dst.collections().list(
521             filters=[['portable_data_hash', '=', colhash]]
522         ).execute(num_retries=args.retries)
523         if dstcol['items_available'] > 0:
524             for d in dstcol['items']:
525                 if ((args.project_uuid == d['owner_uuid']) and
526                     (c.get('name') == d['name']) and
527                     (c['portable_data_hash'] == d['portable_data_hash'])):
528                     return d
529             c['manifest_text'] = dst.collections().get(
530                 uuid=dstcol['items'][0]['uuid']
531             ).execute(num_retries=args.retries)['manifest_text']
532             return create_collection_from(c, src, dst, args)
533
534     # Fetch the collection's manifest.
535     manifest = c['manifest_text']
536     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
537
538     # Copy each block from src_keep to dst_keep.
539     # Use the newly signed locators returned from dst_keep to build
540     # a new manifest as we go.
541     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
542     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
543     dst_manifest = ""
544     dst_locators = {}
545     bytes_written = 0
546     bytes_expected = total_collection_size(manifest)
547     if args.progress:
548         progress_writer = ProgressWriter(human_progress)
549     else:
550         progress_writer = None
551
552     for line in manifest.splitlines():
553         words = line.split()
554         dst_manifest += words[0]
555         for word in words[1:]:
556             try:
557                 loc = arvados.KeepLocator(word)
558             except ValueError:
559                 # If 'word' can't be parsed as a locator,
560                 # presume it's a filename.
561                 dst_manifest += ' ' + word
562                 continue
563             blockhash = loc.md5sum
564             # copy this block if we haven't seen it before
565             # (otherwise, just reuse the existing dst_locator)
566             if blockhash not in dst_locators:
567                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
568                 if progress_writer:
569                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
570                 data = src_keep.get(word)
571                 dst_locator = dst_keep.put(data)
572                 dst_locators[blockhash] = dst_locator
573                 bytes_written += loc.size
574             dst_manifest += ' ' + dst_locators[blockhash]
575         dst_manifest += "\n"
576
577     if progress_writer:
578         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
579         progress_writer.finish()
580
581     # Copy the manifest and save the collection.
582     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
583
584     c['manifest_text'] = dst_manifest
585     return create_collection_from(c, src, dst, args)
586
587 def select_git_url(api, repo_name, retries):
588     r = api.repositories().list(
589         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
590     if r['items_available'] != 1:
591         raise Exception('cannot identify repo {}; {} repos found'
592                         .format(repo_name, r['items_available']))
593
594     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
595     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
596     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
597
598     priority = https_url + other_url + http_url
599
600     git_config = []
601     git_url = None
602     for url in priority:
603         if url.startswith("http"):
604             u = urlparse.urlsplit(url)
605             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
606             git_config = ["-c", "credential.%s/.username=none" % baseurl,
607                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
608         else:
609             git_config = []
610
611         try:
612             logger.debug("trying %s", url)
613             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
614                                       env={"HOME": os.environ["HOME"],
615                                            "ARVADOS_API_TOKEN": api.api_token,
616                                            "GIT_ASKPASS": "/bin/false"})
617         except arvados.errors.CommandFailedError:
618             pass
619         else:
620             git_url = url
621             break
622
623     if not git_url:
624         raise Exception('Cannot access git repository, tried {}'
625                         .format(priority))
626
627     if git_url.startswith("http:"):
628         if api.insecure:
629             logger.warn("Using insecure git url %s but will allow this because ARVADOS_API_HOST_INSECURE is true.", git_url)
630         else:
631             raise Exception("Refusing to use insecure git url %s, set ARVADOS_API_HOST_INSECURE if you really want this." % git_url)
632
633     return (git_url, git_config)
634
635
636 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
637 #
638 #    Copies commits from git repository 'src_git_repo' on Arvados
639 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
640 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
641 #    or "jsmith")
642 #
643 #    All commits will be copied to a destination branch named for the
644 #    source repository URL.
645 #
646 #    The destination repository must already exist.
647 #
648 #    The user running this command must be authenticated
649 #    to both repositories.
650 #
651 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
652     # Identify the fetch and push URLs for the git repositories.
653
654     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries)
655     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries)
656
657     logger.debug('src_git_url: {}'.format(src_git_url))
658     logger.debug('dst_git_url: {}'.format(dst_git_url))
659
660     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
661
662     # Copy git commits from src repo to dst repo.
663     if src_git_repo not in local_repo_dir:
664         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
665         arvados.util.run_command(
666             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
667              local_repo_dir[src_git_repo]],
668             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
669             env={"HOME": os.environ["HOME"],
670                  "ARVADOS_API_TOKEN": src.api_token,
671                  "GIT_ASKPASS": "/bin/false"})
672         arvados.util.run_command(
673             ["git", "remote", "add", "dst", dst_git_url],
674             cwd=local_repo_dir[src_git_repo])
675     arvados.util.run_command(
676         ["git", "branch", dst_branch, script_version],
677         cwd=local_repo_dir[src_git_repo])
678     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
679                              cwd=local_repo_dir[src_git_repo],
680                              env={"HOME": os.environ["HOME"],
681                                   "ARVADOS_API_TOKEN": dst.api_token,
682                                   "GIT_ASKPASS": "/bin/false"})
683
684 def copy_docker_images(pipeline, src, dst, args):
685     """Copy any docker images named in the pipeline components'
686     runtime_constraints field from src to dst."""
687
688     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
689     for c_name, c_info in pipeline['components'].iteritems():
690         if ('runtime_constraints' in c_info and
691             'docker_image' in c_info['runtime_constraints']):
692             copy_docker_image(
693                 c_info['runtime_constraints']['docker_image'],
694                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
695                 src, dst, args)
696
697
698 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
699     """Copy the docker image identified by docker_image and
700     docker_image_tag from src to dst. Create appropriate
701     docker_image_repo+tag and docker_image_hash links at dst.
702
703     """
704
705     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
706
707     # Find the link identifying this docker image.
708     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
709         src, args.retries, docker_image, docker_image_tag)
710     if docker_image_list:
711         image_uuid, image_info = docker_image_list[0]
712         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
713
714         # Copy the collection it refers to.
715         dst_image_col = copy_collection(image_uuid, src, dst, args)
716     elif arvados.util.keep_locator_pattern.match(docker_image):
717         dst_image_col = copy_collection(docker_image, src, dst, args)
718     else:
719         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
720
721 # git_rev_parse(rev, repo)
722 #
723 #    Returns the 40-character commit hash corresponding to 'rev' in
724 #    git repository 'repo' (which must be the path of a local git
725 #    repository)
726 #
727 def git_rev_parse(rev, repo):
728     gitout, giterr = arvados.util.run_command(
729         ['git', 'rev-parse', rev], cwd=repo)
730     return gitout.strip()
731
732 # uuid_type(api, object_uuid)
733 #
734 #    Returns the name of the class that object_uuid belongs to, based on
735 #    the second field of the uuid.  This function consults the api's
736 #    schema to identify the object class.
737 #
738 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
739 #
740 #    Special case: if handed a Keep locator hash, return 'Collection'.
741 #
742 def uuid_type(api, object_uuid):
743     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
744         return 'Collection'
745     p = object_uuid.split('-')
746     if len(p) == 3:
747         type_prefix = p[1]
748         for k in api._schema.schemas:
749             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
750             if type_prefix == obj_class:
751                 return k
752     return None
753
754 def abort(msg, code=1):
755     logger.info("arv-copy: %s", msg)
756     exit(code)
757
758
759 # Code for reporting on the progress of a collection upload.
760 # Stolen from arvados.commands.put.ArvPutCollectionWriter
761 # TODO(twp): figure out how to refactor into a shared library
762 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
763 # code)
764
765 def machine_progress(obj_uuid, bytes_written, bytes_expected):
766     return "{} {}: {} {} written {} total\n".format(
767         sys.argv[0],
768         os.getpid(),
769         obj_uuid,
770         bytes_written,
771         -1 if (bytes_expected is None) else bytes_expected)
772
773 def human_progress(obj_uuid, bytes_written, bytes_expected):
774     if bytes_expected:
775         return "\r{}: {}M / {}M {:.1%} ".format(
776             obj_uuid,
777             bytes_written >> 20, bytes_expected >> 20,
778             float(bytes_written) / bytes_expected)
779     else:
780         return "\r{}: {} ".format(obj_uuid, bytes_written)
781
782 class ProgressWriter(object):
783     _progress_func = None
784     outfile = sys.stderr
785
786     def __init__(self, progress_func):
787         self._progress_func = progress_func
788
789     def report(self, obj_uuid, bytes_written, bytes_expected):
790         if self._progress_func is not None:
791             self.outfile.write(
792                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
793
794     def finish(self):
795         self.outfile.write("\n")
796
797 if __name__ == '__main__':
798     main()