5653: arv-copy copies multiple commits from the same repository+pipeline.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 # Set of (repository, script_version) two-tuples of commits copied in git.
50 scripts_copied = set()
51
52 # The owner_uuid of the object being copied
53 src_owner_uuid = None
54
55 def main():
56     copy_opts = argparse.ArgumentParser(add_help=False)
57
58     copy_opts.add_argument(
59         '-v', '--verbose', dest='verbose', action='store_true',
60         help='Verbose output.')
61     copy_opts.add_argument(
62         '--progress', dest='progress', action='store_true',
63         help='Report progress on copying collections. (default)')
64     copy_opts.add_argument(
65         '--no-progress', dest='progress', action='store_false',
66         help='Do not report progress on copying collections.')
67     copy_opts.add_argument(
68         '-f', '--force', dest='force', action='store_true',
69         help='Perform copy even if the object appears to exist at the remote destination.')
70     copy_opts.add_argument(
71         '--src', dest='source_arvados', required=True,
72         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
73     copy_opts.add_argument(
74         '--dst', dest='destination_arvados', required=True,
75         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
76     copy_opts.add_argument(
77         '--recursive', dest='recursive', action='store_true',
78         help='Recursively copy any dependencies for this object. (default)')
79     copy_opts.add_argument(
80         '--no-recursive', dest='recursive', action='store_false',
81         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
82     copy_opts.add_argument(
83         '--dst-git-repo', dest='dst_git_repo',
84         help='The name of the destination git repository. Required when copying a pipeline recursively.')
85     copy_opts.add_argument(
86         '--project-uuid', dest='project_uuid',
87         help='The UUID of the project at the destination to which the pipeline should be copied.')
88     copy_opts.add_argument(
89         'object_uuid',
90         help='The UUID of the object to be copied.')
91     copy_opts.set_defaults(progress=True)
92     copy_opts.set_defaults(recursive=True)
93
94     parser = argparse.ArgumentParser(
95         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
96         parents=[copy_opts, arv_cmd.retry_opt])
97     args = parser.parse_args()
98
99     if args.verbose:
100         logger.setLevel(logging.DEBUG)
101     else:
102         logger.setLevel(logging.INFO)
103
104     # Create API clients for the source and destination instances
105     src_arv = api_for_instance(args.source_arvados)
106     dst_arv = api_for_instance(args.destination_arvados)
107
108     if not args.project_uuid:
109         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
110
111     # Identify the kind of object we have been given, and begin copying.
112     t = uuid_type(src_arv, args.object_uuid)
113     if t == 'Collection':
114         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
115         result = copy_collection(args.object_uuid,
116                                  src_arv, dst_arv,
117                                  args)
118     elif t == 'PipelineInstance':
119         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
120         result = copy_pipeline_instance(args.object_uuid,
121                                         src_arv, dst_arv,
122                                         args)
123     elif t == 'PipelineTemplate':
124         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
125         result = copy_pipeline_template(args.object_uuid,
126                                         src_arv, dst_arv, args)
127     else:
128         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
129
130     # Clean up any outstanding temp git repositories.
131     for d in local_repo_dir.values():
132         shutil.rmtree(d, ignore_errors=True)
133
134     # If no exception was thrown and the response does not have an
135     # error_token field, presume success
136     if 'error_token' in result or 'uuid' not in result:
137         logger.error("API server returned an error result: {}".format(result))
138         exit(1)
139
140     logger.info("")
141     logger.info("Success: created copy with uuid {}".format(result['uuid']))
142     exit(0)
143
144 def set_src_owner_uuid(resource, uuid, args):
145     global src_owner_uuid
146     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
147     src_owner_uuid = c.get("owner_uuid")
148
149 # api_for_instance(instance_name)
150 #
151 #     Creates an API client for the Arvados instance identified by
152 #     instance_name.
153 #
154 #     If instance_name contains a slash, it is presumed to be a path
155 #     (either local or absolute) to a file with Arvados configuration
156 #     settings.
157 #
158 #     Otherwise, it is presumed to be the name of a file in
159 #     $HOME/.config/arvados/instance_name.conf
160 #
161 def api_for_instance(instance_name):
162     if '/' in instance_name:
163         config_file = instance_name
164     else:
165         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
166
167     try:
168         cfg = arvados.config.load(config_file)
169     except (IOError, OSError) as e:
170         abort(("Could not open config file {}: {}\n" +
171                "You must make sure that your configuration tokens\n" +
172                "for Arvados instance {} are in {} and that this\n" +
173                "file is readable.").format(
174                    config_file, e, instance_name, config_file))
175
176     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
177         api_is_insecure = (
178             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
179                 ['1', 't', 'true', 'y', 'yes']))
180         client = arvados.api('v1',
181                              host=cfg['ARVADOS_API_HOST'],
182                              token=cfg['ARVADOS_API_TOKEN'],
183                              insecure=api_is_insecure)
184     else:
185         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
186     return client
187
188 # copy_pipeline_instance(pi_uuid, src, dst, args)
189 #
190 #    Copies a pipeline instance identified by pi_uuid from src to dst.
191 #
192 #    If the args.recursive option is set:
193 #      1. Copies all input collections
194 #           * For each component in the pipeline, include all collections
195 #             listed as job dependencies for that component)
196 #      2. Copy docker images
197 #      3. Copy git repositories
198 #      4. Copy the pipeline template
199 #
200 #    The only changes made to the copied pipeline instance are:
201 #      1. The original pipeline instance UUID is preserved in
202 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
203 #      2. The pipeline_template_uuid is changed to the new template uuid.
204 #      3. The owner_uuid of the instance is changed to the user who
205 #         copied it.
206 #
207 def copy_pipeline_instance(pi_uuid, src, dst, args):
208     # Fetch the pipeline instance record.
209     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
210
211     if args.recursive:
212         if not args.dst_git_repo:
213             abort('--dst-git-repo is required when copying a pipeline recursively.')
214         # Copy the pipeline template and save the copied template.
215         if pi.get('pipeline_template_uuid', None):
216             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
217                                         src, dst, args)
218
219         # Copy input collections, docker images and git repos.
220         pi = copy_collections(pi, src, dst, args)
221         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
222         copy_docker_images(pi, src, dst, args)
223
224         # Update the fields of the pipeline instance with the copied
225         # pipeline template.
226         if pi.get('pipeline_template_uuid', None):
227             pi['pipeline_template_uuid'] = pt['uuid']
228
229     else:
230         # not recursive
231         logger.info("Copying only pipeline instance %s.", pi_uuid)
232         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
233
234     # Update the pipeline instance properties, and create the new
235     # instance at dst.
236     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
237     pi['description'] = "Pipeline copied from {}\n\n{}".format(
238         pi_uuid,
239         pi['description'] if pi.get('description', None) else '')
240
241     pi['owner_uuid'] = args.project_uuid
242
243     del pi['uuid']
244
245     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
246     return new_pi
247
248 # copy_pipeline_template(pt_uuid, src, dst, args)
249 #
250 #    Copies a pipeline template identified by pt_uuid from src to dst.
251 #
252 #    If args.recursive is True, also copy any collections, docker
253 #    images and git repositories that this template references.
254 #
255 #    The owner_uuid of the new template is changed to that of the user
256 #    who copied the template.
257 #
258 #    Returns the copied pipeline template object.
259 #
260 def copy_pipeline_template(pt_uuid, src, dst, args):
261     # fetch the pipeline template from the source instance
262     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
263
264     if args.recursive:
265         if not args.dst_git_repo:
266             abort('--dst-git-repo is required when copying a pipeline recursively.')
267         # Copy input collections, docker images and git repos.
268         pt = copy_collections(pt, src, dst, args)
269         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
270         copy_docker_images(pt, src, dst, args)
271
272     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
273         pt_uuid,
274         pt['description'] if pt.get('description', None) else '')
275     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
276     del pt['uuid']
277
278     pt['owner_uuid'] = args.project_uuid
279
280     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
281
282 # copy_collections(obj, src, dst, args)
283 #
284 #    Recursively copies all collections referenced by 'obj' from src
285 #    to dst.  obj may be a dict or a list, in which case we run
286 #    copy_collections on every value it contains. If it is a string,
287 #    search it for any substring that matches a collection hash or uuid
288 #    (this will find hidden references to collections like
289 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
290 #
291 #    Returns a copy of obj with any old collection uuids replaced by
292 #    the new ones.
293 #
294 def copy_collections(obj, src, dst, args):
295
296     def copy_collection_fn(collection_match):
297         """Helper function for regex substitution: copies a single collection,
298         identified by the collection_match MatchObject, to the
299         destination.  Returns the destination collection uuid (or the
300         portable data hash if that's what src_id is).
301
302         """
303         src_id = collection_match.group(0)
304         if src_id not in collections_copied:
305             dst_col = copy_collection(src_id, src, dst, args)
306             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
307                 collections_copied[src_id] = src_id
308             else:
309                 collections_copied[src_id] = dst_col['uuid']
310         return collections_copied[src_id]
311
312     if isinstance(obj, basestring):
313         # Copy any collections identified in this string to dst, replacing
314         # them with the dst uuids as necessary.
315         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
316         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
317         return obj
318     elif type(obj) == dict:
319         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
320     elif type(obj) == list:
321         return [copy_collections(v, src, dst, args) for v in obj]
322     return obj
323
324 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
325     """Copy a job's script to the destination repository, and update its record.
326
327     Given a jobspec dictionary, this function finds the referenced script from
328     src and copies it to dst and dst_repo.  It also updates jobspec in place to
329     refer to names on the destination.
330     """
331     repo = jobspec.get('repository')
332     if repo is None:
333         return
334     # script_version is the "script_version" parameter from the source
335     # component or job.  If no script_version was supplied in the
336     # component or job, it is a mistake in the pipeline, but for the
337     # purposes of copying the repository, default to "master".
338     script_version = jobspec.get('script_version') or 'master'
339     script_key = (repo, script_version)
340     if script_key not in scripts_copied:
341         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
342         scripts_copied.add(script_key)
343     jobspec['repository'] = dst_repo
344     repo_dir = local_repo_dir[repo]
345     for version_key in ['script_version', 'supplied_script_version']:
346         if version_key in jobspec:
347             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
348
349 # copy_git_repos(p, src, dst, dst_repo, args)
350 #
351 #    Copies all git repositories referenced by pipeline instance or
352 #    template 'p' from src to dst.
353 #
354 #    For each component c in the pipeline:
355 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
356 #      * Rename script versions:
357 #          * c['script_version']
358 #          * c['job']['script_version']
359 #          * c['job']['supplied_script_version']
360 #        to the commit hashes they resolve to, since any symbolic
361 #        names (tags, branches) are not preserved in the destination repo.
362 #
363 #    The pipeline object is updated in place with the new repository
364 #    names.  The return value is undefined.
365 #
366 def copy_git_repos(p, src, dst, dst_repo, args):
367     for component in p['components'].itervalues():
368         migrate_jobspec(component, src, dst, dst_repo, args)
369         if 'job' in component:
370             migrate_jobspec(component['job'], src, dst, dst_repo, args)
371
372 def total_collection_size(manifest_text):
373     """Return the total number of bytes in this collection (excluding
374     duplicate blocks)."""
375
376     total_bytes = 0
377     locators_seen = {}
378     for line in manifest_text.splitlines():
379         words = line.split()
380         for word in words[1:]:
381             try:
382                 loc = arvados.KeepLocator(word)
383             except ValueError:
384                 continue  # this word isn't a locator, skip it
385             if loc.md5sum not in locators_seen:
386                 locators_seen[loc.md5sum] = True
387                 total_bytes += loc.size
388
389     return total_bytes
390
391 def create_collection_from(c, src, dst, args):
392     """Create a new collection record on dst, and copy Docker metadata if
393     available."""
394
395     collection_uuid = c['uuid']
396     del c['uuid']
397
398     if not c["name"]:
399         c['name'] = "copied from " + collection_uuid
400
401     if 'properties' in c:
402         del c['properties']
403
404     c['owner_uuid'] = args.project_uuid
405
406     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
407
408     # Create docker_image_repo+tag and docker_image_hash links
409     # at the destination.
410     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
411         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
412
413         for d in docker_links:
414             body={
415                 'head_uuid': dst_collection['uuid'],
416                 'link_class': link_class,
417                 'name': d['name'],
418             }
419             body['owner_uuid'] = args.project_uuid
420
421             lk = dst.links().create(body=body).execute(num_retries=args.retries)
422             logger.debug('created dst link {}'.format(lk))
423
424     return dst_collection
425
426 # copy_collection(obj_uuid, src, dst, args)
427 #
428 #    Copies the collection identified by obj_uuid from src to dst.
429 #    Returns the collection object created at dst.
430 #
431 #    If args.progress is True, produce a human-friendly progress
432 #    report.
433 #
434 #    If a collection with the desired portable_data_hash already
435 #    exists at dst, and args.force is False, copy_collection returns
436 #    the existing collection without copying any blocks.  Otherwise
437 #    (if no collection exists or if args.force is True)
438 #    copy_collection copies all of the collection data blocks from src
439 #    to dst.
440 #
441 #    For this application, it is critical to preserve the
442 #    collection's manifest hash, which is not guaranteed with the
443 #    arvados.CollectionReader and arvados.CollectionWriter classes.
444 #    Copying each block in the collection manually, followed by
445 #    the manifest block, ensures that the collection's manifest
446 #    hash will not change.
447 #
448 def copy_collection(obj_uuid, src, dst, args):
449     if arvados.util.keep_locator_pattern.match(obj_uuid):
450         # If the obj_uuid is a portable data hash, it might not be uniquely
451         # identified with a particular collection.  As a result, it is
452         # ambigious as to what name to use for the copy.  Apply some heuristics
453         # to pick which collection to get the name from.
454         srccol = src.collections().list(
455             filters=[['portable_data_hash', '=', obj_uuid]],
456             order="created_at asc"
457             ).execute(num_retries=args.retries)
458
459         items = srccol.get("items")
460
461         if not items:
462             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
463             return
464
465         c = None
466
467         if len(items) == 1:
468             # There's only one collection with the PDH, so use that.
469             c = items[0]
470         if not c:
471             # See if there is a collection that's in the same project
472             # as the root item (usually a pipeline) being copied.
473             for i in items:
474                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
475                     c = i
476                     break
477         if not c:
478             # Didn't find any collections located in the same project, so
479             # pick the oldest collection that has a name assigned to it.
480             for i in items:
481                 if i.get("name"):
482                     c = i
483                     break
484         if not c:
485             # None of the collections have names (?!), so just pick the
486             # first one.
487             c = items[0]
488
489         # list() doesn't return manifest text (and we don't want it to,
490         # because we don't need the same maninfest text sent to us 50
491         # times) so go and retrieve the collection object directly
492         # which will include the manifest text.
493         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
494     else:
495         # Assume this is an actual collection uuid, so fetch it directly.
496         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
497
498     # If a collection with this hash already exists at the
499     # destination, and 'force' is not true, just return that
500     # collection.
501     if not args.force:
502         if 'portable_data_hash' in c:
503             colhash = c['portable_data_hash']
504         else:
505             colhash = c['uuid']
506         dstcol = dst.collections().list(
507             filters=[['portable_data_hash', '=', colhash]]
508         ).execute(num_retries=args.retries)
509         if dstcol['items_available'] > 0:
510             for d in dstcol['items']:
511                 if ((args.project_uuid == d['owner_uuid']) and
512                     (c.get('name') == d['name']) and
513                     (c['portable_data_hash'] == d['portable_data_hash'])):
514                     return d
515             c['manifest_text'] = dst.collections().get(
516                 uuid=dstcol['items'][0]['uuid']
517             ).execute(num_retries=args.retries)['manifest_text']
518             return create_collection_from(c, src, dst, args)
519
520     # Fetch the collection's manifest.
521     manifest = c['manifest_text']
522     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
523
524     # Copy each block from src_keep to dst_keep.
525     # Use the newly signed locators returned from dst_keep to build
526     # a new manifest as we go.
527     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
528     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
529     dst_manifest = ""
530     dst_locators = {}
531     bytes_written = 0
532     bytes_expected = total_collection_size(manifest)
533     if args.progress:
534         progress_writer = ProgressWriter(human_progress)
535     else:
536         progress_writer = None
537
538     for line in manifest.splitlines(True):
539         words = line.split()
540         dst_manifest_line = words[0]
541         for word in words[1:]:
542             try:
543                 loc = arvados.KeepLocator(word)
544                 blockhash = loc.md5sum
545                 # copy this block if we haven't seen it before
546                 # (otherwise, just reuse the existing dst_locator)
547                 if blockhash not in dst_locators:
548                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
549                     if progress_writer:
550                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
551                     data = src_keep.get(word)
552                     dst_locator = dst_keep.put(data)
553                     dst_locators[blockhash] = dst_locator
554                     bytes_written += loc.size
555                 dst_manifest_line += ' ' + dst_locators[blockhash]
556             except ValueError:
557                 # If 'word' can't be parsed as a locator,
558                 # presume it's a filename.
559                 dst_manifest_line += ' ' + word
560         dst_manifest += dst_manifest_line
561         if line.endswith("\n"):
562             dst_manifest += "\n"
563
564     if progress_writer:
565         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
566         progress_writer.finish()
567
568     # Copy the manifest and save the collection.
569     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
570
571     dst_keep.put(dst_manifest.encode('utf-8'))
572     c['manifest_text'] = dst_manifest
573     return create_collection_from(c, src, dst, args)
574
575 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
576 #
577 #    Copies commits from git repository 'src_git_repo' on Arvados
578 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
579 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
580 #    or "jsmith")
581 #
582 #    All commits will be copied to a destination branch named for the
583 #    source repository URL.
584 #
585 #    The destination repository must already exist.
586 #
587 #    The user running this command must be authenticated
588 #    to both repositories.
589 #
590 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
591     # Identify the fetch and push URLs for the git repositories.
592     r = src.repositories().list(
593         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
594     if r['items_available'] != 1:
595         raise Exception('cannot identify source repo {}; {} repos found'
596                         .format(src_git_repo, r['items_available']))
597     src_git_url = r['items'][0]['fetch_url']
598     logger.debug('src_git_url: {}'.format(src_git_url))
599
600     r = dst.repositories().list(
601         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
602     if r['items_available'] != 1:
603         raise Exception('cannot identify destination repo {}; {} repos found'
604                         .format(dst_git_repo, r['items_available']))
605     dst_git_push_url  = r['items'][0]['push_url']
606     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
607
608     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
609
610     # Copy git commits from src repo to dst repo.
611     if src_git_repo not in local_repo_dir:
612         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
613         arvados.util.run_command(
614             ["git", "clone", "--bare", src_git_url,
615              local_repo_dir[src_git_repo]],
616             cwd=os.path.dirname(local_repo_dir[src_git_repo]))
617         arvados.util.run_command(
618             ["git", "remote", "add", "dst", dst_git_push_url],
619             cwd=local_repo_dir[src_git_repo])
620     arvados.util.run_command(
621         ["git", "branch", dst_branch, script_version],
622         cwd=local_repo_dir[src_git_repo])
623     arvados.util.run_command(["git", "push", "dst", dst_branch],
624                              cwd=local_repo_dir[src_git_repo])
625
626 def copy_docker_images(pipeline, src, dst, args):
627     """Copy any docker images named in the pipeline components'
628     runtime_constraints field from src to dst."""
629
630     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
631     for c_name, c_info in pipeline['components'].iteritems():
632         if ('runtime_constraints' in c_info and
633             'docker_image' in c_info['runtime_constraints']):
634             copy_docker_image(
635                 c_info['runtime_constraints']['docker_image'],
636                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
637                 src, dst, args)
638
639
640 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
641     """Copy the docker image identified by docker_image and
642     docker_image_tag from src to dst. Create appropriate
643     docker_image_repo+tag and docker_image_hash links at dst.
644
645     """
646
647     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
648
649     # Find the link identifying this docker image.
650     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
651         src, args.retries, docker_image, docker_image_tag)
652     if docker_image_list:
653         image_uuid, image_info = docker_image_list[0]
654         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
655
656         # Copy the collection it refers to.
657         dst_image_col = copy_collection(image_uuid, src, dst, args)
658     elif arvados.util.keep_locator_pattern.match(docker_image):
659         dst_image_col = copy_collection(docker_image, src, dst, args)
660     else:
661         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
662
663 # git_rev_parse(rev, repo)
664 #
665 #    Returns the 40-character commit hash corresponding to 'rev' in
666 #    git repository 'repo' (which must be the path of a local git
667 #    repository)
668 #
669 def git_rev_parse(rev, repo):
670     gitout, giterr = arvados.util.run_command(
671         ['git', 'rev-parse', rev], cwd=repo)
672     return gitout.strip()
673
674 # uuid_type(api, object_uuid)
675 #
676 #    Returns the name of the class that object_uuid belongs to, based on
677 #    the second field of the uuid.  This function consults the api's
678 #    schema to identify the object class.
679 #
680 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
681 #
682 #    Special case: if handed a Keep locator hash, return 'Collection'.
683 #
684 def uuid_type(api, object_uuid):
685     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
686         return 'Collection'
687     p = object_uuid.split('-')
688     if len(p) == 3:
689         type_prefix = p[1]
690         for k in api._schema.schemas:
691             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
692             if type_prefix == obj_class:
693                 return k
694     return None
695
696 def abort(msg, code=1):
697     logger.info("arv-copy: %s", msg)
698     exit(code)
699
700
701 # Code for reporting on the progress of a collection upload.
702 # Stolen from arvados.commands.put.ArvPutCollectionWriter
703 # TODO(twp): figure out how to refactor into a shared library
704 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
705 # code)
706
707 def machine_progress(obj_uuid, bytes_written, bytes_expected):
708     return "{} {}: {} {} written {} total\n".format(
709         sys.argv[0],
710         os.getpid(),
711         obj_uuid,
712         bytes_written,
713         -1 if (bytes_expected is None) else bytes_expected)
714
715 def human_progress(obj_uuid, bytes_written, bytes_expected):
716     if bytes_expected:
717         return "\r{}: {}M / {}M {:.1%} ".format(
718             obj_uuid,
719             bytes_written >> 20, bytes_expected >> 20,
720             float(bytes_written) / bytes_expected)
721     else:
722         return "\r{}: {} ".format(obj_uuid, bytes_written)
723
724 class ProgressWriter(object):
725     _progress_func = None
726     outfile = sys.stderr
727
728     def __init__(self, progress_func):
729         self._progress_func = progress_func
730
731     def report(self, obj_uuid, bytes_written, bytes_expected):
732         if self._progress_func is not None:
733             self.outfile.write(
734                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
735
736     def finish(self):
737         self.outfile.write("\n")
738
739 if __name__ == '__main__':
740     main()