6605: arv-copy preferentially uses HTTP git urls.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27 import urlparse
28
29 import arvados
30 import arvados.config
31 import arvados.keep
32 import arvados.util
33 import arvados.commands._util as arv_cmd
34 import arvados.commands.keepdocker
35
36 from arvados.api import OrderedJsonModel
37
38 logger = logging.getLogger('arvados.arv-copy')
39
40 # local_repo_dir records which git repositories from the Arvados source
41 # instance have been checked out locally during this run, and to which
42 # directories.
43 # e.g. if repository 'twp' from src_arv has been cloned into
44 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
45 #
46 local_repo_dir = {}
47
48 # List of collections that have been copied in this session, and their
49 # destination collection UUIDs.
50 collections_copied = {}
51
52 # Set of (repository, script_version) two-tuples of commits copied in git.
53 scripts_copied = set()
54
55 # The owner_uuid of the object being copied
56 src_owner_uuid = None
57
58 def main():
59     copy_opts = argparse.ArgumentParser(add_help=False)
60
61     copy_opts.add_argument(
62         '-v', '--verbose', dest='verbose', action='store_true',
63         help='Verbose output.')
64     copy_opts.add_argument(
65         '--progress', dest='progress', action='store_true',
66         help='Report progress on copying collections. (default)')
67     copy_opts.add_argument(
68         '--no-progress', dest='progress', action='store_false',
69         help='Do not report progress on copying collections.')
70     copy_opts.add_argument(
71         '-f', '--force', dest='force', action='store_true',
72         help='Perform copy even if the object appears to exist at the remote destination.')
73     copy_opts.add_argument(
74         '--src', dest='source_arvados', required=True,
75         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
76     copy_opts.add_argument(
77         '--dst', dest='destination_arvados', required=True,
78         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
79     copy_opts.add_argument(
80         '--recursive', dest='recursive', action='store_true',
81         help='Recursively copy any dependencies for this object. (default)')
82     copy_opts.add_argument(
83         '--no-recursive', dest='recursive', action='store_false',
84         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
85     copy_opts.add_argument(
86         '--dst-git-repo', dest='dst_git_repo',
87         help='The name of the destination git repository. Required when copying a pipeline recursively.')
88     copy_opts.add_argument(
89         '--project-uuid', dest='project_uuid',
90         help='The UUID of the project at the destination to which the pipeline should be copied.')
91     copy_opts.add_argument(
92         'object_uuid',
93         help='The UUID of the object to be copied.')
94     copy_opts.set_defaults(progress=True)
95     copy_opts.set_defaults(recursive=True)
96
97     parser = argparse.ArgumentParser(
98         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
99         parents=[copy_opts, arv_cmd.retry_opt])
100     args = parser.parse_args()
101
102     if args.verbose:
103         logger.setLevel(logging.DEBUG)
104     else:
105         logger.setLevel(logging.INFO)
106
107     # Create API clients for the source and destination instances
108     src_arv = api_for_instance(args.source_arvados)
109     dst_arv = api_for_instance(args.destination_arvados)
110
111     if not args.project_uuid:
112         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
113
114     # Identify the kind of object we have been given, and begin copying.
115     t = uuid_type(src_arv, args.object_uuid)
116     if t == 'Collection':
117         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
118         result = copy_collection(args.object_uuid,
119                                  src_arv, dst_arv,
120                                  args)
121     elif t == 'PipelineInstance':
122         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
123         result = copy_pipeline_instance(args.object_uuid,
124                                         src_arv, dst_arv,
125                                         args)
126     elif t == 'PipelineTemplate':
127         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
128         result = copy_pipeline_template(args.object_uuid,
129                                         src_arv, dst_arv, args)
130     else:
131         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
132
133     # Clean up any outstanding temp git repositories.
134     for d in local_repo_dir.values():
135         shutil.rmtree(d, ignore_errors=True)
136
137     # If no exception was thrown and the response does not have an
138     # error_token field, presume success
139     if 'error_token' in result or 'uuid' not in result:
140         logger.error("API server returned an error result: {}".format(result))
141         exit(1)
142
143     logger.info("")
144     logger.info("Success: created copy with uuid {}".format(result['uuid']))
145     exit(0)
146
147 def set_src_owner_uuid(resource, uuid, args):
148     global src_owner_uuid
149     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
150     src_owner_uuid = c.get("owner_uuid")
151
152 # api_for_instance(instance_name)
153 #
154 #     Creates an API client for the Arvados instance identified by
155 #     instance_name.
156 #
157 #     If instance_name contains a slash, it is presumed to be a path
158 #     (either local or absolute) to a file with Arvados configuration
159 #     settings.
160 #
161 #     Otherwise, it is presumed to be the name of a file in
162 #     $HOME/.config/arvados/instance_name.conf
163 #
164 def api_for_instance(instance_name):
165     if '/' in instance_name:
166         config_file = instance_name
167     else:
168         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
169
170     try:
171         cfg = arvados.config.load(config_file)
172     except (IOError, OSError) as e:
173         abort(("Could not open config file {}: {}\n" +
174                "You must make sure that your configuration tokens\n" +
175                "for Arvados instance {} are in {} and that this\n" +
176                "file is readable.").format(
177                    config_file, e, instance_name, config_file))
178
179     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
180         api_is_insecure = (
181             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
182                 ['1', 't', 'true', 'y', 'yes']))
183         client = arvados.api('v1',
184                              host=cfg['ARVADOS_API_HOST'],
185                              token=cfg['ARVADOS_API_TOKEN'],
186                              insecure=api_is_insecure,
187                              model=OrderedJsonModel())
188     else:
189         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
190     return client
191
192 # Check if git is available
193 def check_git_availability():
194     try:
195         arvados.util.run_command(['git', '--help'])
196     except Exception:
197         abort('git command is not available. Please ensure git is installed.')
198
199 # copy_pipeline_instance(pi_uuid, src, dst, args)
200 #
201 #    Copies a pipeline instance identified by pi_uuid from src to dst.
202 #
203 #    If the args.recursive option is set:
204 #      1. Copies all input collections
205 #           * For each component in the pipeline, include all collections
206 #             listed as job dependencies for that component)
207 #      2. Copy docker images
208 #      3. Copy git repositories
209 #      4. Copy the pipeline template
210 #
211 #    The only changes made to the copied pipeline instance are:
212 #      1. The original pipeline instance UUID is preserved in
213 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
214 #      2. The pipeline_template_uuid is changed to the new template uuid.
215 #      3. The owner_uuid of the instance is changed to the user who
216 #         copied it.
217 #
218 def copy_pipeline_instance(pi_uuid, src, dst, args):
219     # Fetch the pipeline instance record.
220     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
221
222     if args.recursive:
223         check_git_availability()
224
225         if not args.dst_git_repo:
226             abort('--dst-git-repo is required when copying a pipeline recursively.')
227         # Copy the pipeline template and save the copied template.
228         if pi.get('pipeline_template_uuid', None):
229             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
230                                         src, dst, args)
231
232         # Copy input collections, docker images and git repos.
233         pi = copy_collections(pi, src, dst, args)
234         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
235         copy_docker_images(pi, src, dst, args)
236
237         # Update the fields of the pipeline instance with the copied
238         # pipeline template.
239         if pi.get('pipeline_template_uuid', None):
240             pi['pipeline_template_uuid'] = pt['uuid']
241
242     else:
243         # not recursive
244         logger.info("Copying only pipeline instance %s.", pi_uuid)
245         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
246
247     # Update the pipeline instance properties, and create the new
248     # instance at dst.
249     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
250     pi['description'] = "Pipeline copied from {}\n\n{}".format(
251         pi_uuid,
252         pi['description'] if pi.get('description', None) else '')
253
254     pi['owner_uuid'] = args.project_uuid
255
256     del pi['uuid']
257
258     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
259     return new_pi
260
261 # copy_pipeline_template(pt_uuid, src, dst, args)
262 #
263 #    Copies a pipeline template identified by pt_uuid from src to dst.
264 #
265 #    If args.recursive is True, also copy any collections, docker
266 #    images and git repositories that this template references.
267 #
268 #    The owner_uuid of the new template is changed to that of the user
269 #    who copied the template.
270 #
271 #    Returns the copied pipeline template object.
272 #
273 def copy_pipeline_template(pt_uuid, src, dst, args):
274     # fetch the pipeline template from the source instance
275     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
276
277     if args.recursive:
278         check_git_availability()
279
280         if not args.dst_git_repo:
281             abort('--dst-git-repo is required when copying a pipeline recursively.')
282         # Copy input collections, docker images and git repos.
283         pt = copy_collections(pt, src, dst, args)
284         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
285         copy_docker_images(pt, src, dst, args)
286
287     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
288         pt_uuid,
289         pt['description'] if pt.get('description', None) else '')
290     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
291     del pt['uuid']
292
293     pt['owner_uuid'] = args.project_uuid
294
295     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
296
297 # copy_collections(obj, src, dst, args)
298 #
299 #    Recursively copies all collections referenced by 'obj' from src
300 #    to dst.  obj may be a dict or a list, in which case we run
301 #    copy_collections on every value it contains. If it is a string,
302 #    search it for any substring that matches a collection hash or uuid
303 #    (this will find hidden references to collections like
304 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
305 #
306 #    Returns a copy of obj with any old collection uuids replaced by
307 #    the new ones.
308 #
309 def copy_collections(obj, src, dst, args):
310
311     def copy_collection_fn(collection_match):
312         """Helper function for regex substitution: copies a single collection,
313         identified by the collection_match MatchObject, to the
314         destination.  Returns the destination collection uuid (or the
315         portable data hash if that's what src_id is).
316
317         """
318         src_id = collection_match.group(0)
319         if src_id not in collections_copied:
320             dst_col = copy_collection(src_id, src, dst, args)
321             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
322                 collections_copied[src_id] = src_id
323             else:
324                 collections_copied[src_id] = dst_col['uuid']
325         return collections_copied[src_id]
326
327     if isinstance(obj, basestring):
328         # Copy any collections identified in this string to dst, replacing
329         # them with the dst uuids as necessary.
330         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
331         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
332         return obj
333     elif isinstance(obj, dict):
334         return type(obj)((v, copy_collections(obj[v], src, dst, args))
335                          for v in obj)
336     elif isinstance(obj, list):
337         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
338     return obj
339
340 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
341     """Copy a job's script to the destination repository, and update its record.
342
343     Given a jobspec dictionary, this function finds the referenced script from
344     src and copies it to dst and dst_repo.  It also updates jobspec in place to
345     refer to names on the destination.
346     """
347     repo = jobspec.get('repository')
348     if repo is None:
349         return
350     # script_version is the "script_version" parameter from the source
351     # component or job.  If no script_version was supplied in the
352     # component or job, it is a mistake in the pipeline, but for the
353     # purposes of copying the repository, default to "master".
354     script_version = jobspec.get('script_version') or 'master'
355     script_key = (repo, script_version)
356     if script_key not in scripts_copied:
357         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
358         scripts_copied.add(script_key)
359     jobspec['repository'] = dst_repo
360     repo_dir = local_repo_dir[repo]
361     for version_key in ['script_version', 'supplied_script_version']:
362         if version_key in jobspec:
363             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
364
365 # copy_git_repos(p, src, dst, dst_repo, args)
366 #
367 #    Copies all git repositories referenced by pipeline instance or
368 #    template 'p' from src to dst.
369 #
370 #    For each component c in the pipeline:
371 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
372 #      * Rename script versions:
373 #          * c['script_version']
374 #          * c['job']['script_version']
375 #          * c['job']['supplied_script_version']
376 #        to the commit hashes they resolve to, since any symbolic
377 #        names (tags, branches) are not preserved in the destination repo.
378 #
379 #    The pipeline object is updated in place with the new repository
380 #    names.  The return value is undefined.
381 #
382 def copy_git_repos(p, src, dst, dst_repo, args):
383     for component in p['components'].itervalues():
384         migrate_jobspec(component, src, dst, dst_repo, args)
385         if 'job' in component:
386             migrate_jobspec(component['job'], src, dst, dst_repo, args)
387
388 def total_collection_size(manifest_text):
389     """Return the total number of bytes in this collection (excluding
390     duplicate blocks)."""
391
392     total_bytes = 0
393     locators_seen = {}
394     for line in manifest_text.splitlines():
395         words = line.split()
396         for word in words[1:]:
397             try:
398                 loc = arvados.KeepLocator(word)
399             except ValueError:
400                 continue  # this word isn't a locator, skip it
401             if loc.md5sum not in locators_seen:
402                 locators_seen[loc.md5sum] = True
403                 total_bytes += loc.size
404
405     return total_bytes
406
407 def create_collection_from(c, src, dst, args):
408     """Create a new collection record on dst, and copy Docker metadata if
409     available."""
410
411     collection_uuid = c['uuid']
412     del c['uuid']
413
414     if not c["name"]:
415         c['name'] = "copied from " + collection_uuid
416
417     if 'properties' in c:
418         del c['properties']
419
420     c['owner_uuid'] = args.project_uuid
421
422     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
423
424     # Create docker_image_repo+tag and docker_image_hash links
425     # at the destination.
426     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
427         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
428
429         for src_link in docker_links:
430             body = {key: src_link[key]
431                     for key in ['link_class', 'name', 'properties']}
432             body['head_uuid'] = dst_collection['uuid']
433             body['owner_uuid'] = args.project_uuid
434
435             lk = dst.links().create(body=body).execute(num_retries=args.retries)
436             logger.debug('created dst link {}'.format(lk))
437
438     return dst_collection
439
440 # copy_collection(obj_uuid, src, dst, args)
441 #
442 #    Copies the collection identified by obj_uuid from src to dst.
443 #    Returns the collection object created at dst.
444 #
445 #    If args.progress is True, produce a human-friendly progress
446 #    report.
447 #
448 #    If a collection with the desired portable_data_hash already
449 #    exists at dst, and args.force is False, copy_collection returns
450 #    the existing collection without copying any blocks.  Otherwise
451 #    (if no collection exists or if args.force is True)
452 #    copy_collection copies all of the collection data blocks from src
453 #    to dst.
454 #
455 #    For this application, it is critical to preserve the
456 #    collection's manifest hash, which is not guaranteed with the
457 #    arvados.CollectionReader and arvados.CollectionWriter classes.
458 #    Copying each block in the collection manually, followed by
459 #    the manifest block, ensures that the collection's manifest
460 #    hash will not change.
461 #
462 def copy_collection(obj_uuid, src, dst, args):
463     if arvados.util.keep_locator_pattern.match(obj_uuid):
464         # If the obj_uuid is a portable data hash, it might not be uniquely
465         # identified with a particular collection.  As a result, it is
466         # ambigious as to what name to use for the copy.  Apply some heuristics
467         # to pick which collection to get the name from.
468         srccol = src.collections().list(
469             filters=[['portable_data_hash', '=', obj_uuid]],
470             order="created_at asc"
471             ).execute(num_retries=args.retries)
472
473         items = srccol.get("items")
474
475         if not items:
476             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
477             return
478
479         c = None
480
481         if len(items) == 1:
482             # There's only one collection with the PDH, so use that.
483             c = items[0]
484         if not c:
485             # See if there is a collection that's in the same project
486             # as the root item (usually a pipeline) being copied.
487             for i in items:
488                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
489                     c = i
490                     break
491         if not c:
492             # Didn't find any collections located in the same project, so
493             # pick the oldest collection that has a name assigned to it.
494             for i in items:
495                 if i.get("name"):
496                     c = i
497                     break
498         if not c:
499             # None of the collections have names (?!), so just pick the
500             # first one.
501             c = items[0]
502
503         # list() doesn't return manifest text (and we don't want it to,
504         # because we don't need the same maninfest text sent to us 50
505         # times) so go and retrieve the collection object directly
506         # which will include the manifest text.
507         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
508     else:
509         # Assume this is an actual collection uuid, so fetch it directly.
510         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
511
512     # If a collection with this hash already exists at the
513     # destination, and 'force' is not true, just return that
514     # collection.
515     if not args.force:
516         if 'portable_data_hash' in c:
517             colhash = c['portable_data_hash']
518         else:
519             colhash = c['uuid']
520         dstcol = dst.collections().list(
521             filters=[['portable_data_hash', '=', colhash]]
522         ).execute(num_retries=args.retries)
523         if dstcol['items_available'] > 0:
524             for d in dstcol['items']:
525                 if ((args.project_uuid == d['owner_uuid']) and
526                     (c.get('name') == d['name']) and
527                     (c['portable_data_hash'] == d['portable_data_hash'])):
528                     return d
529             c['manifest_text'] = dst.collections().get(
530                 uuid=dstcol['items'][0]['uuid']
531             ).execute(num_retries=args.retries)['manifest_text']
532             return create_collection_from(c, src, dst, args)
533
534     # Fetch the collection's manifest.
535     manifest = c['manifest_text']
536     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
537
538     # Copy each block from src_keep to dst_keep.
539     # Use the newly signed locators returned from dst_keep to build
540     # a new manifest as we go.
541     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
542     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
543     dst_manifest = ""
544     dst_locators = {}
545     bytes_written = 0
546     bytes_expected = total_collection_size(manifest)
547     if args.progress:
548         progress_writer = ProgressWriter(human_progress)
549     else:
550         progress_writer = None
551
552     for line in manifest.splitlines():
553         words = line.split()
554         dst_manifest += words[0]
555         for word in words[1:]:
556             try:
557                 loc = arvados.KeepLocator(word)
558             except ValueError:
559                 # If 'word' can't be parsed as a locator,
560                 # presume it's a filename.
561                 dst_manifest += ' ' + word
562                 continue
563             blockhash = loc.md5sum
564             # copy this block if we haven't seen it before
565             # (otherwise, just reuse the existing dst_locator)
566             if blockhash not in dst_locators:
567                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
568                 if progress_writer:
569                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
570                 data = src_keep.get(word)
571                 dst_locator = dst_keep.put(data)
572                 dst_locators[blockhash] = dst_locator
573                 bytes_written += loc.size
574             dst_manifest += ' ' + dst_locators[blockhash]
575         dst_manifest += "\n"
576
577     if progress_writer:
578         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
579         progress_writer.finish()
580
581     # Copy the manifest and save the collection.
582     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
583
584     c['manifest_text'] = dst_manifest
585     return create_collection_from(c, src, dst, args)
586
587 def setup_git_http(url):
588     u = urlparse.urlsplit(url)
589     url = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
590     arvados.util.run_command(["git",
591                               "config",
592                               "--global",
593                               "credential.%s/.username" % url,
594                               "none"])
595     arvados.util.run_command(["git",
596                               "config",
597                               "--global",
598                               "credential.%s/.helper" % url,
599                               """!cred(){ cat >/dev/null; if [ "$1" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred"""])
600
601 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
602 #
603 #    Copies commits from git repository 'src_git_repo' on Arvados
604 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
605 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
606 #    or "jsmith")
607 #
608 #    All commits will be copied to a destination branch named for the
609 #    source repository URL.
610 #
611 #    The destination repository must already exist.
612 #
613 #    The user running this command must be authenticated
614 #    to both repositories.
615 #
616 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
617     # Identify the fetch and push URLs for the git repositories.
618     r = src.repositories().list(
619         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
620     if r['items_available'] != 1:
621         raise Exception('cannot identify source repo {}; {} repos found'
622                         .format(src_git_repo, r['items_available']))
623
624     http_url = [c for c in r['items'][0]['clone_urls'] if c.startswith("http")]
625     if http_url:
626         src_git_url = http_url[0]
627         setup_git_http(src_git_url)
628     else:
629         src_git_url = r['items'][0]['fetch_url']
630     logger.debug('src_git_url: {}'.format(src_git_url))
631
632     r = dst.repositories().list(
633         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
634     if r['items_available'] != 1:
635         raise Exception('cannot identify destination repo {}; {} repos found'
636                         .format(dst_git_repo, r['items_available']))
637
638     http_url = [c for c in r['items'][0]['clone_urls'] if c.startswith("http")]
639     if http_url:
640         dst_git_push_url = http_url[0]
641         setup_git_http(dst_git_push_url)
642     else:
643         dst_git_push_url = r['items'][0]['push_url']
644
645     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
646
647     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
648
649     # Copy git commits from src repo to dst repo.
650     if src_git_repo not in local_repo_dir:
651         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
652         arvados.util.run_command(
653             ["git", "clone", "--bare", src_git_url,
654              local_repo_dir[src_git_repo]],
655             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
656             env={"HOME": os.environ["HOME"],
657                  "ARVADOS_API_TOKEN": src.api_token})
658         arvados.util.run_command(
659             ["git", "remote", "add", "dst", dst_git_push_url],
660             cwd=local_repo_dir[src_git_repo])
661     arvados.util.run_command(
662         ["git", "branch", dst_branch, script_version],
663         cwd=local_repo_dir[src_git_repo])
664     arvados.util.run_command(["git", "push", "dst", dst_branch],
665                              cwd=local_repo_dir[src_git_repo],
666                              env={"HOME": os.environ["HOME"],
667                                   "ARVADOS_API_TOKEN": dst.api_token})
668
669 def copy_docker_images(pipeline, src, dst, args):
670     """Copy any docker images named in the pipeline components'
671     runtime_constraints field from src to dst."""
672
673     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
674     for c_name, c_info in pipeline['components'].iteritems():
675         if ('runtime_constraints' in c_info and
676             'docker_image' in c_info['runtime_constraints']):
677             copy_docker_image(
678                 c_info['runtime_constraints']['docker_image'],
679                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
680                 src, dst, args)
681
682
683 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
684     """Copy the docker image identified by docker_image and
685     docker_image_tag from src to dst. Create appropriate
686     docker_image_repo+tag and docker_image_hash links at dst.
687
688     """
689
690     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
691
692     # Find the link identifying this docker image.
693     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
694         src, args.retries, docker_image, docker_image_tag)
695     if docker_image_list:
696         image_uuid, image_info = docker_image_list[0]
697         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
698
699         # Copy the collection it refers to.
700         dst_image_col = copy_collection(image_uuid, src, dst, args)
701     elif arvados.util.keep_locator_pattern.match(docker_image):
702         dst_image_col = copy_collection(docker_image, src, dst, args)
703     else:
704         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
705
706 # git_rev_parse(rev, repo)
707 #
708 #    Returns the 40-character commit hash corresponding to 'rev' in
709 #    git repository 'repo' (which must be the path of a local git
710 #    repository)
711 #
712 def git_rev_parse(rev, repo):
713     gitout, giterr = arvados.util.run_command(
714         ['git', 'rev-parse', rev], cwd=repo)
715     return gitout.strip()
716
717 # uuid_type(api, object_uuid)
718 #
719 #    Returns the name of the class that object_uuid belongs to, based on
720 #    the second field of the uuid.  This function consults the api's
721 #    schema to identify the object class.
722 #
723 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
724 #
725 #    Special case: if handed a Keep locator hash, return 'Collection'.
726 #
727 def uuid_type(api, object_uuid):
728     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
729         return 'Collection'
730     p = object_uuid.split('-')
731     if len(p) == 3:
732         type_prefix = p[1]
733         for k in api._schema.schemas:
734             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
735             if type_prefix == obj_class:
736                 return k
737     return None
738
739 def abort(msg, code=1):
740     logger.info("arv-copy: %s", msg)
741     exit(code)
742
743
744 # Code for reporting on the progress of a collection upload.
745 # Stolen from arvados.commands.put.ArvPutCollectionWriter
746 # TODO(twp): figure out how to refactor into a shared library
747 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
748 # code)
749
750 def machine_progress(obj_uuid, bytes_written, bytes_expected):
751     return "{} {}: {} {} written {} total\n".format(
752         sys.argv[0],
753         os.getpid(),
754         obj_uuid,
755         bytes_written,
756         -1 if (bytes_expected is None) else bytes_expected)
757
758 def human_progress(obj_uuid, bytes_written, bytes_expected):
759     if bytes_expected:
760         return "\r{}: {}M / {}M {:.1%} ".format(
761             obj_uuid,
762             bytes_written >> 20, bytes_expected >> 20,
763             float(bytes_written) / bytes_expected)
764     else:
765         return "\r{}: {} ".format(obj_uuid, bytes_written)
766
767 class ProgressWriter(object):
768     _progress_func = None
769     outfile = sys.stderr
770
771     def __init__(self, progress_func):
772         self._progress_func = progress_func
773
774     def report(self, obj_uuid, bytes_written, bytes_expected):
775         if self._progress_func is not None:
776             self.outfile.write(
777                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
778
779     def finish(self):
780         self.outfile.write("\n")
781
782 if __name__ == '__main__':
783     main()