Merge branch 'master' into 6588-documentation
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 from arvados.api import OrderedJsonModel
36
37 logger = logging.getLogger('arvados.arv-copy')
38
39 # local_repo_dir records which git repositories from the Arvados source
40 # instance have been checked out locally during this run, and to which
41 # directories.
42 # e.g. if repository 'twp' from src_arv has been cloned into
43 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
44 #
45 local_repo_dir = {}
46
47 # List of collections that have been copied in this session, and their
48 # destination collection UUIDs.
49 collections_copied = {}
50
51 # Set of (repository, script_version) two-tuples of commits copied in git.
52 scripts_copied = set()
53
54 # The owner_uuid of the object being copied
55 src_owner_uuid = None
56
57 def main():
58     copy_opts = argparse.ArgumentParser(add_help=False)
59
60     copy_opts.add_argument(
61         '-v', '--verbose', dest='verbose', action='store_true',
62         help='Verbose output.')
63     copy_opts.add_argument(
64         '--progress', dest='progress', action='store_true',
65         help='Report progress on copying collections. (default)')
66     copy_opts.add_argument(
67         '--no-progress', dest='progress', action='store_false',
68         help='Do not report progress on copying collections.')
69     copy_opts.add_argument(
70         '-f', '--force', dest='force', action='store_true',
71         help='Perform copy even if the object appears to exist at the remote destination.')
72     copy_opts.add_argument(
73         '--src', dest='source_arvados', required=True,
74         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
75     copy_opts.add_argument(
76         '--dst', dest='destination_arvados', required=True,
77         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
78     copy_opts.add_argument(
79         '--recursive', dest='recursive', action='store_true',
80         help='Recursively copy any dependencies for this object. (default)')
81     copy_opts.add_argument(
82         '--no-recursive', dest='recursive', action='store_false',
83         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
84     copy_opts.add_argument(
85         '--dst-git-repo', dest='dst_git_repo',
86         help='The name of the destination git repository. Required when copying a pipeline recursively.')
87     copy_opts.add_argument(
88         '--project-uuid', dest='project_uuid',
89         help='The UUID of the project at the destination to which the pipeline should be copied.')
90     copy_opts.add_argument(
91         'object_uuid',
92         help='The UUID of the object to be copied.')
93     copy_opts.set_defaults(progress=True)
94     copy_opts.set_defaults(recursive=True)
95
96     parser = argparse.ArgumentParser(
97         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
98         parents=[copy_opts, arv_cmd.retry_opt])
99     args = parser.parse_args()
100
101     if args.verbose:
102         logger.setLevel(logging.DEBUG)
103     else:
104         logger.setLevel(logging.INFO)
105
106     # Create API clients for the source and destination instances
107     src_arv = api_for_instance(args.source_arvados)
108     dst_arv = api_for_instance(args.destination_arvados)
109
110     if not args.project_uuid:
111         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
112
113     # Identify the kind of object we have been given, and begin copying.
114     t = uuid_type(src_arv, args.object_uuid)
115     if t == 'Collection':
116         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
117         result = copy_collection(args.object_uuid,
118                                  src_arv, dst_arv,
119                                  args)
120     elif t == 'PipelineInstance':
121         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
122         result = copy_pipeline_instance(args.object_uuid,
123                                         src_arv, dst_arv,
124                                         args)
125     elif t == 'PipelineTemplate':
126         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
127         result = copy_pipeline_template(args.object_uuid,
128                                         src_arv, dst_arv, args)
129     else:
130         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
131
132     # Clean up any outstanding temp git repositories.
133     for d in local_repo_dir.values():
134         shutil.rmtree(d, ignore_errors=True)
135
136     # If no exception was thrown and the response does not have an
137     # error_token field, presume success
138     if 'error_token' in result or 'uuid' not in result:
139         logger.error("API server returned an error result: {}".format(result))
140         exit(1)
141
142     logger.info("")
143     logger.info("Success: created copy with uuid {}".format(result['uuid']))
144     exit(0)
145
146 def set_src_owner_uuid(resource, uuid, args):
147     global src_owner_uuid
148     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
149     src_owner_uuid = c.get("owner_uuid")
150
151 # api_for_instance(instance_name)
152 #
153 #     Creates an API client for the Arvados instance identified by
154 #     instance_name.
155 #
156 #     If instance_name contains a slash, it is presumed to be a path
157 #     (either local or absolute) to a file with Arvados configuration
158 #     settings.
159 #
160 #     Otherwise, it is presumed to be the name of a file in
161 #     $HOME/.config/arvados/instance_name.conf
162 #
163 def api_for_instance(instance_name):
164     if '/' in instance_name:
165         config_file = instance_name
166     else:
167         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
168
169     try:
170         cfg = arvados.config.load(config_file)
171     except (IOError, OSError) as e:
172         abort(("Could not open config file {}: {}\n" +
173                "You must make sure that your configuration tokens\n" +
174                "for Arvados instance {} are in {} and that this\n" +
175                "file is readable.").format(
176                    config_file, e, instance_name, config_file))
177
178     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
179         api_is_insecure = (
180             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
181                 ['1', 't', 'true', 'y', 'yes']))
182         client = arvados.api('v1',
183                              host=cfg['ARVADOS_API_HOST'],
184                              token=cfg['ARVADOS_API_TOKEN'],
185                              insecure=api_is_insecure,
186                              model=OrderedJsonModel())
187     else:
188         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
189     return client
190
191 # Check if git is available
192 def check_git_availability():
193     try:
194         arvados.util.run_command(['git', '--help'])
195     except Exception:
196         abort('git command is not available. Please ensure git is installed.')
197
198 # copy_pipeline_instance(pi_uuid, src, dst, args)
199 #
200 #    Copies a pipeline instance identified by pi_uuid from src to dst.
201 #
202 #    If the args.recursive option is set:
203 #      1. Copies all input collections
204 #           * For each component in the pipeline, include all collections
205 #             listed as job dependencies for that component)
206 #      2. Copy docker images
207 #      3. Copy git repositories
208 #      4. Copy the pipeline template
209 #
210 #    The only changes made to the copied pipeline instance are:
211 #      1. The original pipeline instance UUID is preserved in
212 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
213 #      2. The pipeline_template_uuid is changed to the new template uuid.
214 #      3. The owner_uuid of the instance is changed to the user who
215 #         copied it.
216 #
217 def copy_pipeline_instance(pi_uuid, src, dst, args):
218     # Fetch the pipeline instance record.
219     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
220
221     if args.recursive:
222         check_git_availability()
223
224         if not args.dst_git_repo:
225             abort('--dst-git-repo is required when copying a pipeline recursively.')
226         # Copy the pipeline template and save the copied template.
227         if pi.get('pipeline_template_uuid', None):
228             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
229                                         src, dst, args)
230
231         # Copy input collections, docker images and git repos.
232         pi = copy_collections(pi, src, dst, args)
233         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
234         copy_docker_images(pi, src, dst, args)
235
236         # Update the fields of the pipeline instance with the copied
237         # pipeline template.
238         if pi.get('pipeline_template_uuid', None):
239             pi['pipeline_template_uuid'] = pt['uuid']
240
241     else:
242         # not recursive
243         logger.info("Copying only pipeline instance %s.", pi_uuid)
244         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
245
246     # Update the pipeline instance properties, and create the new
247     # instance at dst.
248     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
249     pi['description'] = "Pipeline copied from {}\n\n{}".format(
250         pi_uuid,
251         pi['description'] if pi.get('description', None) else '')
252
253     pi['owner_uuid'] = args.project_uuid
254
255     del pi['uuid']
256
257     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
258     return new_pi
259
260 # copy_pipeline_template(pt_uuid, src, dst, args)
261 #
262 #    Copies a pipeline template identified by pt_uuid from src to dst.
263 #
264 #    If args.recursive is True, also copy any collections, docker
265 #    images and git repositories that this template references.
266 #
267 #    The owner_uuid of the new template is changed to that of the user
268 #    who copied the template.
269 #
270 #    Returns the copied pipeline template object.
271 #
272 def copy_pipeline_template(pt_uuid, src, dst, args):
273     # fetch the pipeline template from the source instance
274     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
275
276     if args.recursive:
277         check_git_availability()
278
279         if not args.dst_git_repo:
280             abort('--dst-git-repo is required when copying a pipeline recursively.')
281         # Copy input collections, docker images and git repos.
282         pt = copy_collections(pt, src, dst, args)
283         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
284         copy_docker_images(pt, src, dst, args)
285
286     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
287         pt_uuid,
288         pt['description'] if pt.get('description', None) else '')
289     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
290     del pt['uuid']
291
292     pt['owner_uuid'] = args.project_uuid
293
294     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
295
296 # copy_collections(obj, src, dst, args)
297 #
298 #    Recursively copies all collections referenced by 'obj' from src
299 #    to dst.  obj may be a dict or a list, in which case we run
300 #    copy_collections on every value it contains. If it is a string,
301 #    search it for any substring that matches a collection hash or uuid
302 #    (this will find hidden references to collections like
303 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
304 #
305 #    Returns a copy of obj with any old collection uuids replaced by
306 #    the new ones.
307 #
308 def copy_collections(obj, src, dst, args):
309
310     def copy_collection_fn(collection_match):
311         """Helper function for regex substitution: copies a single collection,
312         identified by the collection_match MatchObject, to the
313         destination.  Returns the destination collection uuid (or the
314         portable data hash if that's what src_id is).
315
316         """
317         src_id = collection_match.group(0)
318         if src_id not in collections_copied:
319             dst_col = copy_collection(src_id, src, dst, args)
320             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
321                 collections_copied[src_id] = src_id
322             else:
323                 collections_copied[src_id] = dst_col['uuid']
324         return collections_copied[src_id]
325
326     if isinstance(obj, basestring):
327         # Copy any collections identified in this string to dst, replacing
328         # them with the dst uuids as necessary.
329         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
330         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
331         return obj
332     elif isinstance(obj, dict):
333         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
334     elif isinstance(obj, list):
335         return [copy_collections(v, src, dst, args) for v in obj]
336     return obj
337
338 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
339     """Copy a job's script to the destination repository, and update its record.
340
341     Given a jobspec dictionary, this function finds the referenced script from
342     src and copies it to dst and dst_repo.  It also updates jobspec in place to
343     refer to names on the destination.
344     """
345     repo = jobspec.get('repository')
346     if repo is None:
347         return
348     # script_version is the "script_version" parameter from the source
349     # component or job.  If no script_version was supplied in the
350     # component or job, it is a mistake in the pipeline, but for the
351     # purposes of copying the repository, default to "master".
352     script_version = jobspec.get('script_version') or 'master'
353     script_key = (repo, script_version)
354     if script_key not in scripts_copied:
355         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
356         scripts_copied.add(script_key)
357     jobspec['repository'] = dst_repo
358     repo_dir = local_repo_dir[repo]
359     for version_key in ['script_version', 'supplied_script_version']:
360         if version_key in jobspec:
361             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
362
363 # copy_git_repos(p, src, dst, dst_repo, args)
364 #
365 #    Copies all git repositories referenced by pipeline instance or
366 #    template 'p' from src to dst.
367 #
368 #    For each component c in the pipeline:
369 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
370 #      * Rename script versions:
371 #          * c['script_version']
372 #          * c['job']['script_version']
373 #          * c['job']['supplied_script_version']
374 #        to the commit hashes they resolve to, since any symbolic
375 #        names (tags, branches) are not preserved in the destination repo.
376 #
377 #    The pipeline object is updated in place with the new repository
378 #    names.  The return value is undefined.
379 #
380 def copy_git_repos(p, src, dst, dst_repo, args):
381     for component in p['components'].itervalues():
382         migrate_jobspec(component, src, dst, dst_repo, args)
383         if 'job' in component:
384             migrate_jobspec(component['job'], src, dst, dst_repo, args)
385
386 def total_collection_size(manifest_text):
387     """Return the total number of bytes in this collection (excluding
388     duplicate blocks)."""
389
390     total_bytes = 0
391     locators_seen = {}
392     for line in manifest_text.splitlines():
393         words = line.split()
394         for word in words[1:]:
395             try:
396                 loc = arvados.KeepLocator(word)
397             except ValueError:
398                 continue  # this word isn't a locator, skip it
399             if loc.md5sum not in locators_seen:
400                 locators_seen[loc.md5sum] = True
401                 total_bytes += loc.size
402
403     return total_bytes
404
405 def create_collection_from(c, src, dst, args):
406     """Create a new collection record on dst, and copy Docker metadata if
407     available."""
408
409     collection_uuid = c['uuid']
410     del c['uuid']
411
412     if not c["name"]:
413         c['name'] = "copied from " + collection_uuid
414
415     if 'properties' in c:
416         del c['properties']
417
418     c['owner_uuid'] = args.project_uuid
419
420     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
421
422     # Create docker_image_repo+tag and docker_image_hash links
423     # at the destination.
424     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
425         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
426
427         for src_link in docker_links:
428             body = {key: src_link[key]
429                     for key in ['link_class', 'name', 'properties']}
430             body['head_uuid'] = dst_collection['uuid']
431             body['owner_uuid'] = args.project_uuid
432
433             lk = dst.links().create(body=body).execute(num_retries=args.retries)
434             logger.debug('created dst link {}'.format(lk))
435
436     return dst_collection
437
438 # copy_collection(obj_uuid, src, dst, args)
439 #
440 #    Copies the collection identified by obj_uuid from src to dst.
441 #    Returns the collection object created at dst.
442 #
443 #    If args.progress is True, produce a human-friendly progress
444 #    report.
445 #
446 #    If a collection with the desired portable_data_hash already
447 #    exists at dst, and args.force is False, copy_collection returns
448 #    the existing collection without copying any blocks.  Otherwise
449 #    (if no collection exists or if args.force is True)
450 #    copy_collection copies all of the collection data blocks from src
451 #    to dst.
452 #
453 #    For this application, it is critical to preserve the
454 #    collection's manifest hash, which is not guaranteed with the
455 #    arvados.CollectionReader and arvados.CollectionWriter classes.
456 #    Copying each block in the collection manually, followed by
457 #    the manifest block, ensures that the collection's manifest
458 #    hash will not change.
459 #
460 def copy_collection(obj_uuid, src, dst, args):
461     if arvados.util.keep_locator_pattern.match(obj_uuid):
462         # If the obj_uuid is a portable data hash, it might not be uniquely
463         # identified with a particular collection.  As a result, it is
464         # ambigious as to what name to use for the copy.  Apply some heuristics
465         # to pick which collection to get the name from.
466         srccol = src.collections().list(
467             filters=[['portable_data_hash', '=', obj_uuid]],
468             order="created_at asc"
469             ).execute(num_retries=args.retries)
470
471         items = srccol.get("items")
472
473         if not items:
474             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
475             return
476
477         c = None
478
479         if len(items) == 1:
480             # There's only one collection with the PDH, so use that.
481             c = items[0]
482         if not c:
483             # See if there is a collection that's in the same project
484             # as the root item (usually a pipeline) being copied.
485             for i in items:
486                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
487                     c = i
488                     break
489         if not c:
490             # Didn't find any collections located in the same project, so
491             # pick the oldest collection that has a name assigned to it.
492             for i in items:
493                 if i.get("name"):
494                     c = i
495                     break
496         if not c:
497             # None of the collections have names (?!), so just pick the
498             # first one.
499             c = items[0]
500
501         # list() doesn't return manifest text (and we don't want it to,
502         # because we don't need the same maninfest text sent to us 50
503         # times) so go and retrieve the collection object directly
504         # which will include the manifest text.
505         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
506     else:
507         # Assume this is an actual collection uuid, so fetch it directly.
508         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
509
510     # If a collection with this hash already exists at the
511     # destination, and 'force' is not true, just return that
512     # collection.
513     if not args.force:
514         if 'portable_data_hash' in c:
515             colhash = c['portable_data_hash']
516         else:
517             colhash = c['uuid']
518         dstcol = dst.collections().list(
519             filters=[['portable_data_hash', '=', colhash]]
520         ).execute(num_retries=args.retries)
521         if dstcol['items_available'] > 0:
522             for d in dstcol['items']:
523                 if ((args.project_uuid == d['owner_uuid']) and
524                     (c.get('name') == d['name']) and
525                     (c['portable_data_hash'] == d['portable_data_hash'])):
526                     return d
527             c['manifest_text'] = dst.collections().get(
528                 uuid=dstcol['items'][0]['uuid']
529             ).execute(num_retries=args.retries)['manifest_text']
530             return create_collection_from(c, src, dst, args)
531
532     # Fetch the collection's manifest.
533     manifest = c['manifest_text']
534     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
535
536     # Copy each block from src_keep to dst_keep.
537     # Use the newly signed locators returned from dst_keep to build
538     # a new manifest as we go.
539     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
540     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
541     dst_manifest = ""
542     dst_locators = {}
543     bytes_written = 0
544     bytes_expected = total_collection_size(manifest)
545     if args.progress:
546         progress_writer = ProgressWriter(human_progress)
547     else:
548         progress_writer = None
549
550     for line in manifest.splitlines(True):
551         words = line.split()
552         dst_manifest_line = words[0]
553         for word in words[1:]:
554             try:
555                 loc = arvados.KeepLocator(word)
556                 blockhash = loc.md5sum
557                 # copy this block if we haven't seen it before
558                 # (otherwise, just reuse the existing dst_locator)
559                 if blockhash not in dst_locators:
560                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
561                     if progress_writer:
562                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
563                     data = src_keep.get(word)
564                     dst_locator = dst_keep.put(data)
565                     dst_locators[blockhash] = dst_locator
566                     bytes_written += loc.size
567                 dst_manifest_line += ' ' + dst_locators[blockhash]
568             except ValueError:
569                 # If 'word' can't be parsed as a locator,
570                 # presume it's a filename.
571                 dst_manifest_line += ' ' + word
572         dst_manifest += dst_manifest_line
573         if line.endswith("\n"):
574             dst_manifest += "\n"
575
576     if progress_writer:
577         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
578         progress_writer.finish()
579
580     # Copy the manifest and save the collection.
581     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
582
583     dst_keep.put(dst_manifest.encode('utf-8'))
584     c['manifest_text'] = dst_manifest
585     return create_collection_from(c, src, dst, args)
586
587 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
588 #
589 #    Copies commits from git repository 'src_git_repo' on Arvados
590 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
591 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
592 #    or "jsmith")
593 #
594 #    All commits will be copied to a destination branch named for the
595 #    source repository URL.
596 #
597 #    The destination repository must already exist.
598 #
599 #    The user running this command must be authenticated
600 #    to both repositories.
601 #
602 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
603     # Identify the fetch and push URLs for the git repositories.
604     r = src.repositories().list(
605         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
606     if r['items_available'] != 1:
607         raise Exception('cannot identify source repo {}; {} repos found'
608                         .format(src_git_repo, r['items_available']))
609     src_git_url = r['items'][0]['fetch_url']
610     logger.debug('src_git_url: {}'.format(src_git_url))
611
612     r = dst.repositories().list(
613         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
614     if r['items_available'] != 1:
615         raise Exception('cannot identify destination repo {}; {} repos found'
616                         .format(dst_git_repo, r['items_available']))
617     dst_git_push_url  = r['items'][0]['push_url']
618     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
619
620     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
621
622     # Copy git commits from src repo to dst repo.
623     if src_git_repo not in local_repo_dir:
624         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
625         arvados.util.run_command(
626             ["git", "clone", "--bare", src_git_url,
627              local_repo_dir[src_git_repo]],
628             cwd=os.path.dirname(local_repo_dir[src_git_repo]))
629         arvados.util.run_command(
630             ["git", "remote", "add", "dst", dst_git_push_url],
631             cwd=local_repo_dir[src_git_repo])
632     arvados.util.run_command(
633         ["git", "branch", dst_branch, script_version],
634         cwd=local_repo_dir[src_git_repo])
635     arvados.util.run_command(["git", "push", "dst", dst_branch],
636                              cwd=local_repo_dir[src_git_repo])
637
638 def copy_docker_images(pipeline, src, dst, args):
639     """Copy any docker images named in the pipeline components'
640     runtime_constraints field from src to dst."""
641
642     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
643     for c_name, c_info in pipeline['components'].iteritems():
644         if ('runtime_constraints' in c_info and
645             'docker_image' in c_info['runtime_constraints']):
646             copy_docker_image(
647                 c_info['runtime_constraints']['docker_image'],
648                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
649                 src, dst, args)
650
651
652 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
653     """Copy the docker image identified by docker_image and
654     docker_image_tag from src to dst. Create appropriate
655     docker_image_repo+tag and docker_image_hash links at dst.
656
657     """
658
659     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
660
661     # Find the link identifying this docker image.
662     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
663         src, args.retries, docker_image, docker_image_tag)
664     if docker_image_list:
665         image_uuid, image_info = docker_image_list[0]
666         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
667
668         # Copy the collection it refers to.
669         dst_image_col = copy_collection(image_uuid, src, dst, args)
670     elif arvados.util.keep_locator_pattern.match(docker_image):
671         dst_image_col = copy_collection(docker_image, src, dst, args)
672     else:
673         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
674
675 # git_rev_parse(rev, repo)
676 #
677 #    Returns the 40-character commit hash corresponding to 'rev' in
678 #    git repository 'repo' (which must be the path of a local git
679 #    repository)
680 #
681 def git_rev_parse(rev, repo):
682     gitout, giterr = arvados.util.run_command(
683         ['git', 'rev-parse', rev], cwd=repo)
684     return gitout.strip()
685
686 # uuid_type(api, object_uuid)
687 #
688 #    Returns the name of the class that object_uuid belongs to, based on
689 #    the second field of the uuid.  This function consults the api's
690 #    schema to identify the object class.
691 #
692 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
693 #
694 #    Special case: if handed a Keep locator hash, return 'Collection'.
695 #
696 def uuid_type(api, object_uuid):
697     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
698         return 'Collection'
699     p = object_uuid.split('-')
700     if len(p) == 3:
701         type_prefix = p[1]
702         for k in api._schema.schemas:
703             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
704             if type_prefix == obj_class:
705                 return k
706     return None
707
708 def abort(msg, code=1):
709     logger.info("arv-copy: %s", msg)
710     exit(code)
711
712
713 # Code for reporting on the progress of a collection upload.
714 # Stolen from arvados.commands.put.ArvPutCollectionWriter
715 # TODO(twp): figure out how to refactor into a shared library
716 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
717 # code)
718
719 def machine_progress(obj_uuid, bytes_written, bytes_expected):
720     return "{} {}: {} {} written {} total\n".format(
721         sys.argv[0],
722         os.getpid(),
723         obj_uuid,
724         bytes_written,
725         -1 if (bytes_expected is None) else bytes_expected)
726
727 def human_progress(obj_uuid, bytes_written, bytes_expected):
728     if bytes_expected:
729         return "\r{}: {}M / {}M {:.1%} ".format(
730             obj_uuid,
731             bytes_written >> 20, bytes_expected >> 20,
732             float(bytes_written) / bytes_expected)
733     else:
734         return "\r{}: {} ".format(obj_uuid, bytes_written)
735
736 class ProgressWriter(object):
737     _progress_func = None
738     outfile = sys.stderr
739
740     def __init__(self, progress_func):
741         self._progress_func = progress_func
742
743     def report(self, obj_uuid, bytes_written, bytes_expected):
744         if self._progress_func is not None:
745             self.outfile.write(
746                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
747
748     def finish(self):
749         self.outfile.write("\n")
750
751 if __name__ == '__main__':
752     main()