Merge branch 'master' into 6518-crunch2-dispatch-slurm
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27 import urlparse
28
29 import arvados
30 import arvados.config
31 import arvados.keep
32 import arvados.util
33 import arvados.commands._util as arv_cmd
34 import arvados.commands.keepdocker
35
36 from arvados.api import OrderedJsonModel
37
38 logger = logging.getLogger('arvados.arv-copy')
39
40 # local_repo_dir records which git repositories from the Arvados source
41 # instance have been checked out locally during this run, and to which
42 # directories.
43 # e.g. if repository 'twp' from src_arv has been cloned into
44 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
45 #
46 local_repo_dir = {}
47
48 # List of collections that have been copied in this session, and their
49 # destination collection UUIDs.
50 collections_copied = {}
51
52 # Set of (repository, script_version) two-tuples of commits copied in git.
53 scripts_copied = set()
54
55 # The owner_uuid of the object being copied
56 src_owner_uuid = None
57
58 def main():
59     copy_opts = argparse.ArgumentParser(add_help=False)
60
61     copy_opts.add_argument(
62         '-v', '--verbose', dest='verbose', action='store_true',
63         help='Verbose output.')
64     copy_opts.add_argument(
65         '--progress', dest='progress', action='store_true',
66         help='Report progress on copying collections. (default)')
67     copy_opts.add_argument(
68         '--no-progress', dest='progress', action='store_false',
69         help='Do not report progress on copying collections.')
70     copy_opts.add_argument(
71         '-f', '--force', dest='force', action='store_true',
72         help='Perform copy even if the object appears to exist at the remote destination.')
73     copy_opts.add_argument(
74         '--src', dest='source_arvados', required=True,
75         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
76     copy_opts.add_argument(
77         '--dst', dest='destination_arvados', required=True,
78         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
79     copy_opts.add_argument(
80         '--recursive', dest='recursive', action='store_true',
81         help='Recursively copy any dependencies for this object. (default)')
82     copy_opts.add_argument(
83         '--no-recursive', dest='recursive', action='store_false',
84         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
85     copy_opts.add_argument(
86         '--dst-git-repo', dest='dst_git_repo',
87         help='The name of the destination git repository. Required when copying a pipeline recursively.')
88     copy_opts.add_argument(
89         '--project-uuid', dest='project_uuid',
90         help='The UUID of the project at the destination to which the pipeline should be copied.')
91     copy_opts.add_argument(
92         '--allow-git-http-src', action="store_true",
93         help='Allow cloning git repositories over insecure http')
94     copy_opts.add_argument(
95         '--allow-git-http-dst', action="store_true",
96         help='Allow pushing git repositories over insecure http')
97
98     copy_opts.add_argument(
99         'object_uuid',
100         help='The UUID of the object to be copied.')
101     copy_opts.set_defaults(progress=True)
102     copy_opts.set_defaults(recursive=True)
103
104     parser = argparse.ArgumentParser(
105         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
106         parents=[copy_opts, arv_cmd.retry_opt])
107     args = parser.parse_args()
108
109     if args.verbose:
110         logger.setLevel(logging.DEBUG)
111     else:
112         logger.setLevel(logging.INFO)
113
114     # Create API clients for the source and destination instances
115     src_arv = api_for_instance(args.source_arvados)
116     dst_arv = api_for_instance(args.destination_arvados)
117
118     if not args.project_uuid:
119         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
120
121     # Identify the kind of object we have been given, and begin copying.
122     t = uuid_type(src_arv, args.object_uuid)
123     if t == 'Collection':
124         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
125         result = copy_collection(args.object_uuid,
126                                  src_arv, dst_arv,
127                                  args)
128     elif t == 'PipelineInstance':
129         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
130         result = copy_pipeline_instance(args.object_uuid,
131                                         src_arv, dst_arv,
132                                         args)
133     elif t == 'PipelineTemplate':
134         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
135         result = copy_pipeline_template(args.object_uuid,
136                                         src_arv, dst_arv, args)
137     else:
138         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
139
140     # Clean up any outstanding temp git repositories.
141     for d in local_repo_dir.values():
142         shutil.rmtree(d, ignore_errors=True)
143
144     # If no exception was thrown and the response does not have an
145     # error_token field, presume success
146     if 'error_token' in result or 'uuid' not in result:
147         logger.error("API server returned an error result: {}".format(result))
148         exit(1)
149
150     logger.info("")
151     logger.info("Success: created copy with uuid {}".format(result['uuid']))
152     exit(0)
153
154 def set_src_owner_uuid(resource, uuid, args):
155     global src_owner_uuid
156     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
157     src_owner_uuid = c.get("owner_uuid")
158
159 # api_for_instance(instance_name)
160 #
161 #     Creates an API client for the Arvados instance identified by
162 #     instance_name.
163 #
164 #     If instance_name contains a slash, it is presumed to be a path
165 #     (either local or absolute) to a file with Arvados configuration
166 #     settings.
167 #
168 #     Otherwise, it is presumed to be the name of a file in
169 #     $HOME/.config/arvados/instance_name.conf
170 #
171 def api_for_instance(instance_name):
172     if '/' in instance_name:
173         config_file = instance_name
174     else:
175         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
176
177     try:
178         cfg = arvados.config.load(config_file)
179     except (IOError, OSError) as e:
180         abort(("Could not open config file {}: {}\n" +
181                "You must make sure that your configuration tokens\n" +
182                "for Arvados instance {} are in {} and that this\n" +
183                "file is readable.").format(
184                    config_file, e, instance_name, config_file))
185
186     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
187         api_is_insecure = (
188             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
189                 ['1', 't', 'true', 'y', 'yes']))
190         client = arvados.api('v1',
191                              host=cfg['ARVADOS_API_HOST'],
192                              token=cfg['ARVADOS_API_TOKEN'],
193                              insecure=api_is_insecure,
194                              model=OrderedJsonModel())
195     else:
196         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
197     return client
198
199 # Check if git is available
200 def check_git_availability():
201     try:
202         arvados.util.run_command(['git', '--help'])
203     except Exception:
204         abort('git command is not available. Please ensure git is installed.')
205
206 # copy_pipeline_instance(pi_uuid, src, dst, args)
207 #
208 #    Copies a pipeline instance identified by pi_uuid from src to dst.
209 #
210 #    If the args.recursive option is set:
211 #      1. Copies all input collections
212 #           * For each component in the pipeline, include all collections
213 #             listed as job dependencies for that component)
214 #      2. Copy docker images
215 #      3. Copy git repositories
216 #      4. Copy the pipeline template
217 #
218 #    The only changes made to the copied pipeline instance are:
219 #      1. The original pipeline instance UUID is preserved in
220 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
221 #      2. The pipeline_template_uuid is changed to the new template uuid.
222 #      3. The owner_uuid of the instance is changed to the user who
223 #         copied it.
224 #
225 def copy_pipeline_instance(pi_uuid, src, dst, args):
226     # Fetch the pipeline instance record.
227     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
228
229     if args.recursive:
230         check_git_availability()
231
232         if not args.dst_git_repo:
233             abort('--dst-git-repo is required when copying a pipeline recursively.')
234         # Copy the pipeline template and save the copied template.
235         if pi.get('pipeline_template_uuid', None):
236             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
237                                         src, dst, args)
238
239         # Copy input collections, docker images and git repos.
240         pi = copy_collections(pi, src, dst, args)
241         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
242         copy_docker_images(pi, src, dst, args)
243
244         # Update the fields of the pipeline instance with the copied
245         # pipeline template.
246         if pi.get('pipeline_template_uuid', None):
247             pi['pipeline_template_uuid'] = pt['uuid']
248
249     else:
250         # not recursive
251         logger.info("Copying only pipeline instance %s.", pi_uuid)
252         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
253
254     # Update the pipeline instance properties, and create the new
255     # instance at dst.
256     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
257     pi['description'] = "Pipeline copied from {}\n\n{}".format(
258         pi_uuid,
259         pi['description'] if pi.get('description', None) else '')
260
261     pi['owner_uuid'] = args.project_uuid
262
263     del pi['uuid']
264
265     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
266     return new_pi
267
268 # copy_pipeline_template(pt_uuid, src, dst, args)
269 #
270 #    Copies a pipeline template identified by pt_uuid from src to dst.
271 #
272 #    If args.recursive is True, also copy any collections, docker
273 #    images and git repositories that this template references.
274 #
275 #    The owner_uuid of the new template is changed to that of the user
276 #    who copied the template.
277 #
278 #    Returns the copied pipeline template object.
279 #
280 def copy_pipeline_template(pt_uuid, src, dst, args):
281     # fetch the pipeline template from the source instance
282     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
283
284     if args.recursive:
285         check_git_availability()
286
287         if not args.dst_git_repo:
288             abort('--dst-git-repo is required when copying a pipeline recursively.')
289         # Copy input collections, docker images and git repos.
290         pt = copy_collections(pt, src, dst, args)
291         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
292         copy_docker_images(pt, src, dst, args)
293
294     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
295         pt_uuid,
296         pt['description'] if pt.get('description', None) else '')
297     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
298     del pt['uuid']
299
300     pt['owner_uuid'] = args.project_uuid
301
302     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
303
304 # copy_collections(obj, src, dst, args)
305 #
306 #    Recursively copies all collections referenced by 'obj' from src
307 #    to dst.  obj may be a dict or a list, in which case we run
308 #    copy_collections on every value it contains. If it is a string,
309 #    search it for any substring that matches a collection hash or uuid
310 #    (this will find hidden references to collections like
311 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
312 #
313 #    Returns a copy of obj with any old collection uuids replaced by
314 #    the new ones.
315 #
316 def copy_collections(obj, src, dst, args):
317
318     def copy_collection_fn(collection_match):
319         """Helper function for regex substitution: copies a single collection,
320         identified by the collection_match MatchObject, to the
321         destination.  Returns the destination collection uuid (or the
322         portable data hash if that's what src_id is).
323
324         """
325         src_id = collection_match.group(0)
326         if src_id not in collections_copied:
327             dst_col = copy_collection(src_id, src, dst, args)
328             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
329                 collections_copied[src_id] = src_id
330             else:
331                 collections_copied[src_id] = dst_col['uuid']
332         return collections_copied[src_id]
333
334     if isinstance(obj, basestring):
335         # Copy any collections identified in this string to dst, replacing
336         # them with the dst uuids as necessary.
337         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
338         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
339         return obj
340     elif isinstance(obj, dict):
341         return type(obj)((v, copy_collections(obj[v], src, dst, args))
342                          for v in obj)
343     elif isinstance(obj, list):
344         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
345     return obj
346
347 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
348     """Copy a job's script to the destination repository, and update its record.
349
350     Given a jobspec dictionary, this function finds the referenced script from
351     src and copies it to dst and dst_repo.  It also updates jobspec in place to
352     refer to names on the destination.
353     """
354     repo = jobspec.get('repository')
355     if repo is None:
356         return
357     # script_version is the "script_version" parameter from the source
358     # component or job.  If no script_version was supplied in the
359     # component or job, it is a mistake in the pipeline, but for the
360     # purposes of copying the repository, default to "master".
361     script_version = jobspec.get('script_version') or 'master'
362     script_key = (repo, script_version)
363     if script_key not in scripts_copied:
364         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
365         scripts_copied.add(script_key)
366     jobspec['repository'] = dst_repo
367     repo_dir = local_repo_dir[repo]
368     for version_key in ['script_version', 'supplied_script_version']:
369         if version_key in jobspec:
370             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
371
372 # copy_git_repos(p, src, dst, dst_repo, args)
373 #
374 #    Copies all git repositories referenced by pipeline instance or
375 #    template 'p' from src to dst.
376 #
377 #    For each component c in the pipeline:
378 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
379 #      * Rename script versions:
380 #          * c['script_version']
381 #          * c['job']['script_version']
382 #          * c['job']['supplied_script_version']
383 #        to the commit hashes they resolve to, since any symbolic
384 #        names (tags, branches) are not preserved in the destination repo.
385 #
386 #    The pipeline object is updated in place with the new repository
387 #    names.  The return value is undefined.
388 #
389 def copy_git_repos(p, src, dst, dst_repo, args):
390     for component in p['components'].itervalues():
391         migrate_jobspec(component, src, dst, dst_repo, args)
392         if 'job' in component:
393             migrate_jobspec(component['job'], src, dst, dst_repo, args)
394
395 def total_collection_size(manifest_text):
396     """Return the total number of bytes in this collection (excluding
397     duplicate blocks)."""
398
399     total_bytes = 0
400     locators_seen = {}
401     for line in manifest_text.splitlines():
402         words = line.split()
403         for word in words[1:]:
404             try:
405                 loc = arvados.KeepLocator(word)
406             except ValueError:
407                 continue  # this word isn't a locator, skip it
408             if loc.md5sum not in locators_seen:
409                 locators_seen[loc.md5sum] = True
410                 total_bytes += loc.size
411
412     return total_bytes
413
414 def create_collection_from(c, src, dst, args):
415     """Create a new collection record on dst, and copy Docker metadata if
416     available."""
417
418     collection_uuid = c['uuid']
419     del c['uuid']
420
421     if not c["name"]:
422         c['name'] = "copied from " + collection_uuid
423
424     if 'properties' in c:
425         del c['properties']
426
427     c['owner_uuid'] = args.project_uuid
428
429     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
430
431     # Create docker_image_repo+tag and docker_image_hash links
432     # at the destination.
433     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
434         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
435
436         for src_link in docker_links:
437             body = {key: src_link[key]
438                     for key in ['link_class', 'name', 'properties']}
439             body['head_uuid'] = dst_collection['uuid']
440             body['owner_uuid'] = args.project_uuid
441
442             lk = dst.links().create(body=body).execute(num_retries=args.retries)
443             logger.debug('created dst link {}'.format(lk))
444
445     return dst_collection
446
447 # copy_collection(obj_uuid, src, dst, args)
448 #
449 #    Copies the collection identified by obj_uuid from src to dst.
450 #    Returns the collection object created at dst.
451 #
452 #    If args.progress is True, produce a human-friendly progress
453 #    report.
454 #
455 #    If a collection with the desired portable_data_hash already
456 #    exists at dst, and args.force is False, copy_collection returns
457 #    the existing collection without copying any blocks.  Otherwise
458 #    (if no collection exists or if args.force is True)
459 #    copy_collection copies all of the collection data blocks from src
460 #    to dst.
461 #
462 #    For this application, it is critical to preserve the
463 #    collection's manifest hash, which is not guaranteed with the
464 #    arvados.CollectionReader and arvados.CollectionWriter classes.
465 #    Copying each block in the collection manually, followed by
466 #    the manifest block, ensures that the collection's manifest
467 #    hash will not change.
468 #
469 def copy_collection(obj_uuid, src, dst, args):
470     if arvados.util.keep_locator_pattern.match(obj_uuid):
471         # If the obj_uuid is a portable data hash, it might not be uniquely
472         # identified with a particular collection.  As a result, it is
473         # ambigious as to what name to use for the copy.  Apply some heuristics
474         # to pick which collection to get the name from.
475         srccol = src.collections().list(
476             filters=[['portable_data_hash', '=', obj_uuid]],
477             order="created_at asc"
478             ).execute(num_retries=args.retries)
479
480         items = srccol.get("items")
481
482         if not items:
483             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
484             return
485
486         c = None
487
488         if len(items) == 1:
489             # There's only one collection with the PDH, so use that.
490             c = items[0]
491         if not c:
492             # See if there is a collection that's in the same project
493             # as the root item (usually a pipeline) being copied.
494             for i in items:
495                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
496                     c = i
497                     break
498         if not c:
499             # Didn't find any collections located in the same project, so
500             # pick the oldest collection that has a name assigned to it.
501             for i in items:
502                 if i.get("name"):
503                     c = i
504                     break
505         if not c:
506             # None of the collections have names (?!), so just pick the
507             # first one.
508             c = items[0]
509
510         # list() doesn't return manifest text (and we don't want it to,
511         # because we don't need the same maninfest text sent to us 50
512         # times) so go and retrieve the collection object directly
513         # which will include the manifest text.
514         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
515     else:
516         # Assume this is an actual collection uuid, so fetch it directly.
517         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
518
519     # If a collection with this hash already exists at the
520     # destination, and 'force' is not true, just return that
521     # collection.
522     if not args.force:
523         if 'portable_data_hash' in c:
524             colhash = c['portable_data_hash']
525         else:
526             colhash = c['uuid']
527         dstcol = dst.collections().list(
528             filters=[['portable_data_hash', '=', colhash]]
529         ).execute(num_retries=args.retries)
530         if dstcol['items_available'] > 0:
531             for d in dstcol['items']:
532                 if ((args.project_uuid == d['owner_uuid']) and
533                     (c.get('name') == d['name']) and
534                     (c['portable_data_hash'] == d['portable_data_hash'])):
535                     return d
536             c['manifest_text'] = dst.collections().get(
537                 uuid=dstcol['items'][0]['uuid']
538             ).execute(num_retries=args.retries)['manifest_text']
539             return create_collection_from(c, src, dst, args)
540
541     # Fetch the collection's manifest.
542     manifest = c['manifest_text']
543     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
544
545     # Copy each block from src_keep to dst_keep.
546     # Use the newly signed locators returned from dst_keep to build
547     # a new manifest as we go.
548     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
549     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
550     dst_manifest = ""
551     dst_locators = {}
552     bytes_written = 0
553     bytes_expected = total_collection_size(manifest)
554     if args.progress:
555         progress_writer = ProgressWriter(human_progress)
556     else:
557         progress_writer = None
558
559     for line in manifest.splitlines():
560         words = line.split()
561         dst_manifest += words[0]
562         for word in words[1:]:
563             try:
564                 loc = arvados.KeepLocator(word)
565             except ValueError:
566                 # If 'word' can't be parsed as a locator,
567                 # presume it's a filename.
568                 dst_manifest += ' ' + word
569                 continue
570             blockhash = loc.md5sum
571             # copy this block if we haven't seen it before
572             # (otherwise, just reuse the existing dst_locator)
573             if blockhash not in dst_locators:
574                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
575                 if progress_writer:
576                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
577                 data = src_keep.get(word)
578                 dst_locator = dst_keep.put(data)
579                 dst_locators[blockhash] = dst_locator
580                 bytes_written += loc.size
581             dst_manifest += ' ' + dst_locators[blockhash]
582         dst_manifest += "\n"
583
584     if progress_writer:
585         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
586         progress_writer.finish()
587
588     # Copy the manifest and save the collection.
589     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
590
591     c['manifest_text'] = dst_manifest
592     return create_collection_from(c, src, dst, args)
593
594 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
595     r = api.repositories().list(
596         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
597     if r['items_available'] != 1:
598         raise Exception('cannot identify repo {}; {} repos found'
599                         .format(repo_name, r['items_available']))
600
601     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
602     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
603     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
604
605     priority = https_url + other_url + http_url
606
607     git_config = []
608     git_url = None
609     for url in priority:
610         if url.startswith("http"):
611             u = urlparse.urlsplit(url)
612             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
613             git_config = ["-c", "credential.%s/.username=none" % baseurl,
614                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
615         else:
616             git_config = []
617
618         try:
619             logger.debug("trying %s", url)
620             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
621                                       env={"HOME": os.environ["HOME"],
622                                            "ARVADOS_API_TOKEN": api.api_token,
623                                            "GIT_ASKPASS": "/bin/false"})
624         except arvados.errors.CommandFailedError:
625             pass
626         else:
627             git_url = url
628             break
629
630     if not git_url:
631         raise Exception('Cannot access git repository, tried {}'
632                         .format(priority))
633
634     if git_url.startswith("http:"):
635         if allow_insecure_http:
636             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
637         else:
638             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
639
640     return (git_url, git_config)
641
642
643 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
644 #
645 #    Copies commits from git repository 'src_git_repo' on Arvados
646 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
647 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
648 #    or "jsmith")
649 #
650 #    All commits will be copied to a destination branch named for the
651 #    source repository URL.
652 #
653 #    The destination repository must already exist.
654 #
655 #    The user running this command must be authenticated
656 #    to both repositories.
657 #
658 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
659     # Identify the fetch and push URLs for the git repositories.
660
661     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
662     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
663
664     logger.debug('src_git_url: {}'.format(src_git_url))
665     logger.debug('dst_git_url: {}'.format(dst_git_url))
666
667     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
668
669     # Copy git commits from src repo to dst repo.
670     if src_git_repo not in local_repo_dir:
671         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
672         arvados.util.run_command(
673             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
674              local_repo_dir[src_git_repo]],
675             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
676             env={"HOME": os.environ["HOME"],
677                  "ARVADOS_API_TOKEN": src.api_token,
678                  "GIT_ASKPASS": "/bin/false"})
679         arvados.util.run_command(
680             ["git", "remote", "add", "dst", dst_git_url],
681             cwd=local_repo_dir[src_git_repo])
682     arvados.util.run_command(
683         ["git", "branch", dst_branch, script_version],
684         cwd=local_repo_dir[src_git_repo])
685     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
686                              cwd=local_repo_dir[src_git_repo],
687                              env={"HOME": os.environ["HOME"],
688                                   "ARVADOS_API_TOKEN": dst.api_token,
689                                   "GIT_ASKPASS": "/bin/false"})
690
691 def copy_docker_images(pipeline, src, dst, args):
692     """Copy any docker images named in the pipeline components'
693     runtime_constraints field from src to dst."""
694
695     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
696     for c_name, c_info in pipeline['components'].iteritems():
697         if ('runtime_constraints' in c_info and
698             'docker_image' in c_info['runtime_constraints']):
699             copy_docker_image(
700                 c_info['runtime_constraints']['docker_image'],
701                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
702                 src, dst, args)
703
704
705 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
706     """Copy the docker image identified by docker_image and
707     docker_image_tag from src to dst. Create appropriate
708     docker_image_repo+tag and docker_image_hash links at dst.
709
710     """
711
712     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
713
714     # Find the link identifying this docker image.
715     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
716         src, args.retries, docker_image, docker_image_tag)
717     if docker_image_list:
718         image_uuid, image_info = docker_image_list[0]
719         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
720
721         # Copy the collection it refers to.
722         dst_image_col = copy_collection(image_uuid, src, dst, args)
723     elif arvados.util.keep_locator_pattern.match(docker_image):
724         dst_image_col = copy_collection(docker_image, src, dst, args)
725     else:
726         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
727
728 # git_rev_parse(rev, repo)
729 #
730 #    Returns the 40-character commit hash corresponding to 'rev' in
731 #    git repository 'repo' (which must be the path of a local git
732 #    repository)
733 #
734 def git_rev_parse(rev, repo):
735     gitout, giterr = arvados.util.run_command(
736         ['git', 'rev-parse', rev], cwd=repo)
737     return gitout.strip()
738
739 # uuid_type(api, object_uuid)
740 #
741 #    Returns the name of the class that object_uuid belongs to, based on
742 #    the second field of the uuid.  This function consults the api's
743 #    schema to identify the object class.
744 #
745 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
746 #
747 #    Special case: if handed a Keep locator hash, return 'Collection'.
748 #
749 def uuid_type(api, object_uuid):
750     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
751         return 'Collection'
752     p = object_uuid.split('-')
753     if len(p) == 3:
754         type_prefix = p[1]
755         for k in api._schema.schemas:
756             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
757             if type_prefix == obj_class:
758                 return k
759     return None
760
761 def abort(msg, code=1):
762     logger.info("arv-copy: %s", msg)
763     exit(code)
764
765
766 # Code for reporting on the progress of a collection upload.
767 # Stolen from arvados.commands.put.ArvPutCollectionWriter
768 # TODO(twp): figure out how to refactor into a shared library
769 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
770 # code)
771
772 def machine_progress(obj_uuid, bytes_written, bytes_expected):
773     return "{} {}: {} {} written {} total\n".format(
774         sys.argv[0],
775         os.getpid(),
776         obj_uuid,
777         bytes_written,
778         -1 if (bytes_expected is None) else bytes_expected)
779
780 def human_progress(obj_uuid, bytes_written, bytes_expected):
781     if bytes_expected:
782         return "\r{}: {}M / {}M {:.1%} ".format(
783             obj_uuid,
784             bytes_written >> 20, bytes_expected >> 20,
785             float(bytes_written) / bytes_expected)
786     else:
787         return "\r{}: {} ".format(obj_uuid, bytes_written)
788
789 class ProgressWriter(object):
790     _progress_func = None
791     outfile = sys.stderr
792
793     def __init__(self, progress_func):
794         self._progress_func = progress_func
795
796     def report(self, obj_uuid, bytes_written, bytes_expected):
797         if self._progress_func is not None:
798             self.outfile.write(
799                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
800
801     def finish(self):
802         self.outfile.write("\n")
803
804 if __name__ == '__main__':
805     main()