Merge branch 'master' into 5720-ajax-loading-error
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 # Set of (repository, script_version) two-tuples of commits copied in git.
50 scripts_copied = set()
51
52 # The owner_uuid of the object being copied
53 src_owner_uuid = None
54
55 def main():
56     copy_opts = argparse.ArgumentParser(add_help=False)
57
58     copy_opts.add_argument(
59         '-v', '--verbose', dest='verbose', action='store_true',
60         help='Verbose output.')
61     copy_opts.add_argument(
62         '--progress', dest='progress', action='store_true',
63         help='Report progress on copying collections. (default)')
64     copy_opts.add_argument(
65         '--no-progress', dest='progress', action='store_false',
66         help='Do not report progress on copying collections.')
67     copy_opts.add_argument(
68         '-f', '--force', dest='force', action='store_true',
69         help='Perform copy even if the object appears to exist at the remote destination.')
70     copy_opts.add_argument(
71         '--src', dest='source_arvados', required=True,
72         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
73     copy_opts.add_argument(
74         '--dst', dest='destination_arvados', required=True,
75         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
76     copy_opts.add_argument(
77         '--recursive', dest='recursive', action='store_true',
78         help='Recursively copy any dependencies for this object. (default)')
79     copy_opts.add_argument(
80         '--no-recursive', dest='recursive', action='store_false',
81         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
82     copy_opts.add_argument(
83         '--dst-git-repo', dest='dst_git_repo',
84         help='The name of the destination git repository. Required when copying a pipeline recursively.')
85     copy_opts.add_argument(
86         '--project-uuid', dest='project_uuid',
87         help='The UUID of the project at the destination to which the pipeline should be copied.')
88     copy_opts.add_argument(
89         'object_uuid',
90         help='The UUID of the object to be copied.')
91     copy_opts.set_defaults(progress=True)
92     copy_opts.set_defaults(recursive=True)
93
94     parser = argparse.ArgumentParser(
95         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
96         parents=[copy_opts, arv_cmd.retry_opt])
97     args = parser.parse_args()
98
99     if args.verbose:
100         logger.setLevel(logging.DEBUG)
101     else:
102         logger.setLevel(logging.INFO)
103
104     # Create API clients for the source and destination instances
105     src_arv = api_for_instance(args.source_arvados)
106     dst_arv = api_for_instance(args.destination_arvados)
107
108     if not args.project_uuid:
109         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
110
111     # Identify the kind of object we have been given, and begin copying.
112     t = uuid_type(src_arv, args.object_uuid)
113     if t == 'Collection':
114         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
115         result = copy_collection(args.object_uuid,
116                                  src_arv, dst_arv,
117                                  args)
118     elif t == 'PipelineInstance':
119         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
120         result = copy_pipeline_instance(args.object_uuid,
121                                         src_arv, dst_arv,
122                                         args)
123     elif t == 'PipelineTemplate':
124         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
125         result = copy_pipeline_template(args.object_uuid,
126                                         src_arv, dst_arv, args)
127     else:
128         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
129
130     # Clean up any outstanding temp git repositories.
131     for d in local_repo_dir.values():
132         shutil.rmtree(d, ignore_errors=True)
133
134     # If no exception was thrown and the response does not have an
135     # error_token field, presume success
136     if 'error_token' in result or 'uuid' not in result:
137         logger.error("API server returned an error result: {}".format(result))
138         exit(1)
139
140     logger.info("")
141     logger.info("Success: created copy with uuid {}".format(result['uuid']))
142     exit(0)
143
144 def set_src_owner_uuid(resource, uuid, args):
145     global src_owner_uuid
146     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
147     src_owner_uuid = c.get("owner_uuid")
148
149 # api_for_instance(instance_name)
150 #
151 #     Creates an API client for the Arvados instance identified by
152 #     instance_name.
153 #
154 #     If instance_name contains a slash, it is presumed to be a path
155 #     (either local or absolute) to a file with Arvados configuration
156 #     settings.
157 #
158 #     Otherwise, it is presumed to be the name of a file in
159 #     $HOME/.config/arvados/instance_name.conf
160 #
161 def api_for_instance(instance_name):
162     if '/' in instance_name:
163         config_file = instance_name
164     else:
165         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
166
167     try:
168         cfg = arvados.config.load(config_file)
169     except (IOError, OSError) as e:
170         abort(("Could not open config file {}: {}\n" +
171                "You must make sure that your configuration tokens\n" +
172                "for Arvados instance {} are in {} and that this\n" +
173                "file is readable.").format(
174                    config_file, e, instance_name, config_file))
175
176     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
177         api_is_insecure = (
178             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
179                 ['1', 't', 'true', 'y', 'yes']))
180         client = arvados.api('v1',
181                              host=cfg['ARVADOS_API_HOST'],
182                              token=cfg['ARVADOS_API_TOKEN'],
183                              insecure=api_is_insecure)
184     else:
185         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
186     return client
187
188 # copy_pipeline_instance(pi_uuid, src, dst, args)
189 #
190 #    Copies a pipeline instance identified by pi_uuid from src to dst.
191 #
192 #    If the args.recursive option is set:
193 #      1. Copies all input collections
194 #           * For each component in the pipeline, include all collections
195 #             listed as job dependencies for that component)
196 #      2. Copy docker images
197 #      3. Copy git repositories
198 #      4. Copy the pipeline template
199 #
200 #    The only changes made to the copied pipeline instance are:
201 #      1. The original pipeline instance UUID is preserved in
202 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
203 #      2. The pipeline_template_uuid is changed to the new template uuid.
204 #      3. The owner_uuid of the instance is changed to the user who
205 #         copied it.
206 #
207 def copy_pipeline_instance(pi_uuid, src, dst, args):
208     # Fetch the pipeline instance record.
209     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
210
211     if args.recursive:
212         if not args.dst_git_repo:
213             abort('--dst-git-repo is required when copying a pipeline recursively.')
214         # Copy the pipeline template and save the copied template.
215         if pi.get('pipeline_template_uuid', None):
216             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
217                                         src, dst, args)
218
219         # Copy input collections, docker images and git repos.
220         pi = copy_collections(pi, src, dst, args)
221         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
222         copy_docker_images(pi, src, dst, args)
223
224         # Update the fields of the pipeline instance with the copied
225         # pipeline template.
226         if pi.get('pipeline_template_uuid', None):
227             pi['pipeline_template_uuid'] = pt['uuid']
228
229     else:
230         # not recursive
231         logger.info("Copying only pipeline instance %s.", pi_uuid)
232         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
233
234     # Update the pipeline instance properties, and create the new
235     # instance at dst.
236     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
237     pi['description'] = "Pipeline copied from {}\n\n{}".format(
238         pi_uuid,
239         pi['description'] if pi.get('description', None) else '')
240
241     pi['owner_uuid'] = args.project_uuid
242
243     del pi['uuid']
244
245     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
246     return new_pi
247
248 # copy_pipeline_template(pt_uuid, src, dst, args)
249 #
250 #    Copies a pipeline template identified by pt_uuid from src to dst.
251 #
252 #    If args.recursive is True, also copy any collections, docker
253 #    images and git repositories that this template references.
254 #
255 #    The owner_uuid of the new template is changed to that of the user
256 #    who copied the template.
257 #
258 #    Returns the copied pipeline template object.
259 #
260 def copy_pipeline_template(pt_uuid, src, dst, args):
261     # fetch the pipeline template from the source instance
262     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
263
264     if args.recursive:
265         if not args.dst_git_repo:
266             abort('--dst-git-repo is required when copying a pipeline recursively.')
267         # Copy input collections, docker images and git repos.
268         pt = copy_collections(pt, src, dst, args)
269         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
270         copy_docker_images(pt, src, dst, args)
271
272     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
273         pt_uuid,
274         pt['description'] if pt.get('description', None) else '')
275     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
276     del pt['uuid']
277
278     pt['owner_uuid'] = args.project_uuid
279
280     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
281
282 # copy_collections(obj, src, dst, args)
283 #
284 #    Recursively copies all collections referenced by 'obj' from src
285 #    to dst.  obj may be a dict or a list, in which case we run
286 #    copy_collections on every value it contains. If it is a string,
287 #    search it for any substring that matches a collection hash or uuid
288 #    (this will find hidden references to collections like
289 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
290 #
291 #    Returns a copy of obj with any old collection uuids replaced by
292 #    the new ones.
293 #
294 def copy_collections(obj, src, dst, args):
295
296     def copy_collection_fn(collection_match):
297         """Helper function for regex substitution: copies a single collection,
298         identified by the collection_match MatchObject, to the
299         destination.  Returns the destination collection uuid (or the
300         portable data hash if that's what src_id is).
301
302         """
303         src_id = collection_match.group(0)
304         if src_id not in collections_copied:
305             dst_col = copy_collection(src_id, src, dst, args)
306             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
307                 collections_copied[src_id] = src_id
308             else:
309                 collections_copied[src_id] = dst_col['uuid']
310         return collections_copied[src_id]
311
312     if isinstance(obj, basestring):
313         # Copy any collections identified in this string to dst, replacing
314         # them with the dst uuids as necessary.
315         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
316         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
317         return obj
318     elif type(obj) == dict:
319         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
320     elif type(obj) == list:
321         return [copy_collections(v, src, dst, args) for v in obj]
322     return obj
323
324 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
325     """Copy a job's script to the destination repository, and update its record.
326
327     Given a jobspec dictionary, this function finds the referenced script from
328     src and copies it to dst and dst_repo.  It also updates jobspec in place to
329     refer to names on the destination.
330     """
331     repo = jobspec.get('repository')
332     if repo is None:
333         return
334     # script_version is the "script_version" parameter from the source
335     # component or job.  If no script_version was supplied in the
336     # component or job, it is a mistake in the pipeline, but for the
337     # purposes of copying the repository, default to "master".
338     script_version = jobspec.get('script_version') or 'master'
339     script_key = (repo, script_version)
340     if script_key not in scripts_copied:
341         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
342         scripts_copied.add(script_key)
343     jobspec['repository'] = dst_repo
344     repo_dir = local_repo_dir[repo]
345     for version_key in ['script_version', 'supplied_script_version']:
346         if version_key in jobspec:
347             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
348
349 # copy_git_repos(p, src, dst, dst_repo, args)
350 #
351 #    Copies all git repositories referenced by pipeline instance or
352 #    template 'p' from src to dst.
353 #
354 #    For each component c in the pipeline:
355 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
356 #      * Rename script versions:
357 #          * c['script_version']
358 #          * c['job']['script_version']
359 #          * c['job']['supplied_script_version']
360 #        to the commit hashes they resolve to, since any symbolic
361 #        names (tags, branches) are not preserved in the destination repo.
362 #
363 #    The pipeline object is updated in place with the new repository
364 #    names.  The return value is undefined.
365 #
366 def copy_git_repos(p, src, dst, dst_repo, args):
367     for component in p['components'].itervalues():
368         migrate_jobspec(component, src, dst, dst_repo, args)
369         if 'job' in component:
370             migrate_jobspec(component['job'], src, dst, dst_repo, args)
371
372 def total_collection_size(manifest_text):
373     """Return the total number of bytes in this collection (excluding
374     duplicate blocks)."""
375
376     total_bytes = 0
377     locators_seen = {}
378     for line in manifest_text.splitlines():
379         words = line.split()
380         for word in words[1:]:
381             try:
382                 loc = arvados.KeepLocator(word)
383             except ValueError:
384                 continue  # this word isn't a locator, skip it
385             if loc.md5sum not in locators_seen:
386                 locators_seen[loc.md5sum] = True
387                 total_bytes += loc.size
388
389     return total_bytes
390
391 def create_collection_from(c, src, dst, args):
392     """Create a new collection record on dst, and copy Docker metadata if
393     available."""
394
395     collection_uuid = c['uuid']
396     del c['uuid']
397
398     if not c["name"]:
399         c['name'] = "copied from " + collection_uuid
400
401     if 'properties' in c:
402         del c['properties']
403
404     c['owner_uuid'] = args.project_uuid
405
406     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
407
408     # Create docker_image_repo+tag and docker_image_hash links
409     # at the destination.
410     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
411         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
412
413         for src_link in docker_links:
414             body = {key: src_link[key]
415                     for key in ['link_class', 'name', 'properties']}
416             body['head_uuid'] = dst_collection['uuid']
417             body['owner_uuid'] = args.project_uuid
418
419             lk = dst.links().create(body=body).execute(num_retries=args.retries)
420             logger.debug('created dst link {}'.format(lk))
421
422     return dst_collection
423
424 # copy_collection(obj_uuid, src, dst, args)
425 #
426 #    Copies the collection identified by obj_uuid from src to dst.
427 #    Returns the collection object created at dst.
428 #
429 #    If args.progress is True, produce a human-friendly progress
430 #    report.
431 #
432 #    If a collection with the desired portable_data_hash already
433 #    exists at dst, and args.force is False, copy_collection returns
434 #    the existing collection without copying any blocks.  Otherwise
435 #    (if no collection exists or if args.force is True)
436 #    copy_collection copies all of the collection data blocks from src
437 #    to dst.
438 #
439 #    For this application, it is critical to preserve the
440 #    collection's manifest hash, which is not guaranteed with the
441 #    arvados.CollectionReader and arvados.CollectionWriter classes.
442 #    Copying each block in the collection manually, followed by
443 #    the manifest block, ensures that the collection's manifest
444 #    hash will not change.
445 #
446 def copy_collection(obj_uuid, src, dst, args):
447     if arvados.util.keep_locator_pattern.match(obj_uuid):
448         # If the obj_uuid is a portable data hash, it might not be uniquely
449         # identified with a particular collection.  As a result, it is
450         # ambigious as to what name to use for the copy.  Apply some heuristics
451         # to pick which collection to get the name from.
452         srccol = src.collections().list(
453             filters=[['portable_data_hash', '=', obj_uuid]],
454             order="created_at asc"
455             ).execute(num_retries=args.retries)
456
457         items = srccol.get("items")
458
459         if not items:
460             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
461             return
462
463         c = None
464
465         if len(items) == 1:
466             # There's only one collection with the PDH, so use that.
467             c = items[0]
468         if not c:
469             # See if there is a collection that's in the same project
470             # as the root item (usually a pipeline) being copied.
471             for i in items:
472                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
473                     c = i
474                     break
475         if not c:
476             # Didn't find any collections located in the same project, so
477             # pick the oldest collection that has a name assigned to it.
478             for i in items:
479                 if i.get("name"):
480                     c = i
481                     break
482         if not c:
483             # None of the collections have names (?!), so just pick the
484             # first one.
485             c = items[0]
486
487         # list() doesn't return manifest text (and we don't want it to,
488         # because we don't need the same maninfest text sent to us 50
489         # times) so go and retrieve the collection object directly
490         # which will include the manifest text.
491         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
492     else:
493         # Assume this is an actual collection uuid, so fetch it directly.
494         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
495
496     # If a collection with this hash already exists at the
497     # destination, and 'force' is not true, just return that
498     # collection.
499     if not args.force:
500         if 'portable_data_hash' in c:
501             colhash = c['portable_data_hash']
502         else:
503             colhash = c['uuid']
504         dstcol = dst.collections().list(
505             filters=[['portable_data_hash', '=', colhash]]
506         ).execute(num_retries=args.retries)
507         if dstcol['items_available'] > 0:
508             for d in dstcol['items']:
509                 if ((args.project_uuid == d['owner_uuid']) and
510                     (c.get('name') == d['name']) and
511                     (c['portable_data_hash'] == d['portable_data_hash'])):
512                     return d
513             c['manifest_text'] = dst.collections().get(
514                 uuid=dstcol['items'][0]['uuid']
515             ).execute(num_retries=args.retries)['manifest_text']
516             return create_collection_from(c, src, dst, args)
517
518     # Fetch the collection's manifest.
519     manifest = c['manifest_text']
520     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
521
522     # Copy each block from src_keep to dst_keep.
523     # Use the newly signed locators returned from dst_keep to build
524     # a new manifest as we go.
525     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
526     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
527     dst_manifest = ""
528     dst_locators = {}
529     bytes_written = 0
530     bytes_expected = total_collection_size(manifest)
531     if args.progress:
532         progress_writer = ProgressWriter(human_progress)
533     else:
534         progress_writer = None
535
536     for line in manifest.splitlines(True):
537         words = line.split()
538         dst_manifest_line = words[0]
539         for word in words[1:]:
540             try:
541                 loc = arvados.KeepLocator(word)
542                 blockhash = loc.md5sum
543                 # copy this block if we haven't seen it before
544                 # (otherwise, just reuse the existing dst_locator)
545                 if blockhash not in dst_locators:
546                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
547                     if progress_writer:
548                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
549                     data = src_keep.get(word)
550                     dst_locator = dst_keep.put(data)
551                     dst_locators[blockhash] = dst_locator
552                     bytes_written += loc.size
553                 dst_manifest_line += ' ' + dst_locators[blockhash]
554             except ValueError:
555                 # If 'word' can't be parsed as a locator,
556                 # presume it's a filename.
557                 dst_manifest_line += ' ' + word
558         dst_manifest += dst_manifest_line
559         if line.endswith("\n"):
560             dst_manifest += "\n"
561
562     if progress_writer:
563         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
564         progress_writer.finish()
565
566     # Copy the manifest and save the collection.
567     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
568
569     dst_keep.put(dst_manifest.encode('utf-8'))
570     c['manifest_text'] = dst_manifest
571     return create_collection_from(c, src, dst, args)
572
573 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
574 #
575 #    Copies commits from git repository 'src_git_repo' on Arvados
576 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
577 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
578 #    or "jsmith")
579 #
580 #    All commits will be copied to a destination branch named for the
581 #    source repository URL.
582 #
583 #    The destination repository must already exist.
584 #
585 #    The user running this command must be authenticated
586 #    to both repositories.
587 #
588 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
589     # Identify the fetch and push URLs for the git repositories.
590     r = src.repositories().list(
591         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
592     if r['items_available'] != 1:
593         raise Exception('cannot identify source repo {}; {} repos found'
594                         .format(src_git_repo, r['items_available']))
595     src_git_url = r['items'][0]['fetch_url']
596     logger.debug('src_git_url: {}'.format(src_git_url))
597
598     r = dst.repositories().list(
599         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
600     if r['items_available'] != 1:
601         raise Exception('cannot identify destination repo {}; {} repos found'
602                         .format(dst_git_repo, r['items_available']))
603     dst_git_push_url  = r['items'][0]['push_url']
604     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
605
606     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
607
608     # Copy git commits from src repo to dst repo.
609     if src_git_repo not in local_repo_dir:
610         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
611         arvados.util.run_command(
612             ["git", "clone", "--bare", src_git_url,
613              local_repo_dir[src_git_repo]],
614             cwd=os.path.dirname(local_repo_dir[src_git_repo]))
615         arvados.util.run_command(
616             ["git", "remote", "add", "dst", dst_git_push_url],
617             cwd=local_repo_dir[src_git_repo])
618     arvados.util.run_command(
619         ["git", "branch", dst_branch, script_version],
620         cwd=local_repo_dir[src_git_repo])
621     arvados.util.run_command(["git", "push", "dst", dst_branch],
622                              cwd=local_repo_dir[src_git_repo])
623
624 def copy_docker_images(pipeline, src, dst, args):
625     """Copy any docker images named in the pipeline components'
626     runtime_constraints field from src to dst."""
627
628     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
629     for c_name, c_info in pipeline['components'].iteritems():
630         if ('runtime_constraints' in c_info and
631             'docker_image' in c_info['runtime_constraints']):
632             copy_docker_image(
633                 c_info['runtime_constraints']['docker_image'],
634                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
635                 src, dst, args)
636
637
638 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
639     """Copy the docker image identified by docker_image and
640     docker_image_tag from src to dst. Create appropriate
641     docker_image_repo+tag and docker_image_hash links at dst.
642
643     """
644
645     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
646
647     # Find the link identifying this docker image.
648     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
649         src, args.retries, docker_image, docker_image_tag)
650     if docker_image_list:
651         image_uuid, image_info = docker_image_list[0]
652         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
653
654         # Copy the collection it refers to.
655         dst_image_col = copy_collection(image_uuid, src, dst, args)
656     elif arvados.util.keep_locator_pattern.match(docker_image):
657         dst_image_col = copy_collection(docker_image, src, dst, args)
658     else:
659         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
660
661 # git_rev_parse(rev, repo)
662 #
663 #    Returns the 40-character commit hash corresponding to 'rev' in
664 #    git repository 'repo' (which must be the path of a local git
665 #    repository)
666 #
667 def git_rev_parse(rev, repo):
668     gitout, giterr = arvados.util.run_command(
669         ['git', 'rev-parse', rev], cwd=repo)
670     return gitout.strip()
671
672 # uuid_type(api, object_uuid)
673 #
674 #    Returns the name of the class that object_uuid belongs to, based on
675 #    the second field of the uuid.  This function consults the api's
676 #    schema to identify the object class.
677 #
678 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
679 #
680 #    Special case: if handed a Keep locator hash, return 'Collection'.
681 #
682 def uuid_type(api, object_uuid):
683     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
684         return 'Collection'
685     p = object_uuid.split('-')
686     if len(p) == 3:
687         type_prefix = p[1]
688         for k in api._schema.schemas:
689             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
690             if type_prefix == obj_class:
691                 return k
692     return None
693
694 def abort(msg, code=1):
695     logger.info("arv-copy: %s", msg)
696     exit(code)
697
698
699 # Code for reporting on the progress of a collection upload.
700 # Stolen from arvados.commands.put.ArvPutCollectionWriter
701 # TODO(twp): figure out how to refactor into a shared library
702 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
703 # code)
704
705 def machine_progress(obj_uuid, bytes_written, bytes_expected):
706     return "{} {}: {} {} written {} total\n".format(
707         sys.argv[0],
708         os.getpid(),
709         obj_uuid,
710         bytes_written,
711         -1 if (bytes_expected is None) else bytes_expected)
712
713 def human_progress(obj_uuid, bytes_written, bytes_expected):
714     if bytes_expected:
715         return "\r{}: {}M / {}M {:.1%} ".format(
716             obj_uuid,
717             bytes_written >> 20, bytes_expected >> 20,
718             float(bytes_written) / bytes_expected)
719     else:
720         return "\r{}: {} ".format(obj_uuid, bytes_written)
721
722 class ProgressWriter(object):
723     _progress_func = None
724     outfile = sys.stderr
725
726     def __init__(self, progress_func):
727         self._progress_func = progress_func
728
729     def report(self, obj_uuid, bytes_written, bytes_expected):
730         if self._progress_func is not None:
731             self.outfile.write(
732                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
733
734     def finish(self):
735         self.outfile.write("\n")
736
737 if __name__ == '__main__':
738     main()