cf054a3cc07cc341698e28423e74744ee54b3df9
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import contextlib
21 import functools
22 import getpass
23 import os
24 import re
25 import shutil
26 import sys
27 import logging
28 import tempfile
29 import urlparse
30
31 import arvados
32 import arvados.config
33 import arvados.keep
34 import arvados.util
35 import arvados.commands._util as arv_cmd
36 import arvados.commands.keepdocker
37
38 from arvados.api import OrderedJsonModel
39
40 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
41
42 logger = logging.getLogger('arvados.arv-copy')
43
44 # local_repo_dir records which git repositories from the Arvados source
45 # instance have been checked out locally during this run, and to which
46 # directories.
47 # e.g. if repository 'twp' from src_arv has been cloned into
48 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
49 #
50 local_repo_dir = {}
51
52 # List of collections that have been copied in this session, and their
53 # destination collection UUIDs.
54 collections_copied = {}
55
56 # Set of (repository, script_version) two-tuples of commits copied in git.
57 scripts_copied = set()
58
59 # The owner_uuid of the object being copied
60 src_owner_uuid = None
61
62 def main():
63     copy_opts = argparse.ArgumentParser(add_help=False)
64
65     copy_opts.add_argument(
66         '-v', '--verbose', dest='verbose', action='store_true',
67         help='Verbose output.')
68     copy_opts.add_argument(
69         '--progress', dest='progress', action='store_true',
70         help='Report progress on copying collections. (default)')
71     copy_opts.add_argument(
72         '--no-progress', dest='progress', action='store_false',
73         help='Do not report progress on copying collections.')
74     copy_opts.add_argument(
75         '-f', '--force', dest='force', action='store_true',
76         help='Perform copy even if the object appears to exist at the remote destination.')
77     copy_opts.add_argument(
78         '--force-filters', action='store_true', default=False,
79         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
80     copy_opts.add_argument(
81         '--src', dest='source_arvados', required=True,
82         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
83     copy_opts.add_argument(
84         '--dst', dest='destination_arvados', required=True,
85         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
86     copy_opts.add_argument(
87         '--recursive', dest='recursive', action='store_true',
88         help='Recursively copy any dependencies for this object. (default)')
89     copy_opts.add_argument(
90         '--no-recursive', dest='recursive', action='store_false',
91         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
92     copy_opts.add_argument(
93         '--dst-git-repo', dest='dst_git_repo',
94         help='The name of the destination git repository. Required when copying a pipeline recursively.')
95     copy_opts.add_argument(
96         '--project-uuid', dest='project_uuid',
97         help='The UUID of the project at the destination to which the pipeline should be copied.')
98     copy_opts.add_argument(
99         '--allow-git-http-src', action="store_true",
100         help='Allow cloning git repositories over insecure http')
101     copy_opts.add_argument(
102         '--allow-git-http-dst', action="store_true",
103         help='Allow pushing git repositories over insecure http')
104
105     copy_opts.add_argument(
106         'object_uuid',
107         help='The UUID of the object to be copied.')
108     copy_opts.set_defaults(progress=True)
109     copy_opts.set_defaults(recursive=True)
110
111     parser = argparse.ArgumentParser(
112         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
113         parents=[copy_opts, arv_cmd.retry_opt])
114     args = parser.parse_args()
115
116     if args.verbose:
117         logger.setLevel(logging.DEBUG)
118     else:
119         logger.setLevel(logging.INFO)
120
121     # Create API clients for the source and destination instances
122     src_arv = api_for_instance(args.source_arvados)
123     dst_arv = api_for_instance(args.destination_arvados)
124
125     if not args.project_uuid:
126         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
127
128     # Identify the kind of object we have been given, and begin copying.
129     t = uuid_type(src_arv, args.object_uuid)
130     if t == 'Collection':
131         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
132         result = copy_collection(args.object_uuid,
133                                  src_arv, dst_arv,
134                                  args)
135     elif t == 'PipelineInstance':
136         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
137         result = copy_pipeline_instance(args.object_uuid,
138                                         src_arv, dst_arv,
139                                         args)
140     elif t == 'PipelineTemplate':
141         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
142         result = copy_pipeline_template(args.object_uuid,
143                                         src_arv, dst_arv, args)
144     else:
145         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
146
147     # Clean up any outstanding temp git repositories.
148     for d in local_repo_dir.values():
149         shutil.rmtree(d, ignore_errors=True)
150
151     # If no exception was thrown and the response does not have an
152     # error_token field, presume success
153     if 'error_token' in result or 'uuid' not in result:
154         logger.error("API server returned an error result: {}".format(result))
155         exit(1)
156
157     logger.info("")
158     logger.info("Success: created copy with uuid {}".format(result['uuid']))
159     exit(0)
160
161 def set_src_owner_uuid(resource, uuid, args):
162     global src_owner_uuid
163     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
164     src_owner_uuid = c.get("owner_uuid")
165
166 # api_for_instance(instance_name)
167 #
168 #     Creates an API client for the Arvados instance identified by
169 #     instance_name.
170 #
171 #     If instance_name contains a slash, it is presumed to be a path
172 #     (either local or absolute) to a file with Arvados configuration
173 #     settings.
174 #
175 #     Otherwise, it is presumed to be the name of a file in
176 #     $HOME/.config/arvados/instance_name.conf
177 #
178 def api_for_instance(instance_name):
179     if '/' in instance_name:
180         config_file = instance_name
181     else:
182         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
183
184     try:
185         cfg = arvados.config.load(config_file)
186     except (IOError, OSError) as e:
187         abort(("Could not open config file {}: {}\n" +
188                "You must make sure that your configuration tokens\n" +
189                "for Arvados instance {} are in {} and that this\n" +
190                "file is readable.").format(
191                    config_file, e, instance_name, config_file))
192
193     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
194         api_is_insecure = (
195             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
196                 ['1', 't', 'true', 'y', 'yes']))
197         client = arvados.api('v1',
198                              host=cfg['ARVADOS_API_HOST'],
199                              token=cfg['ARVADOS_API_TOKEN'],
200                              insecure=api_is_insecure,
201                              model=OrderedJsonModel())
202     else:
203         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
204     return client
205
206 # Check if git is available
207 def check_git_availability():
208     try:
209         arvados.util.run_command(['git', '--help'])
210     except Exception:
211         abort('git command is not available. Please ensure git is installed.')
212
213 # copy_pipeline_instance(pi_uuid, src, dst, args)
214 #
215 #    Copies a pipeline instance identified by pi_uuid from src to dst.
216 #
217 #    If the args.recursive option is set:
218 #      1. Copies all input collections
219 #           * For each component in the pipeline, include all collections
220 #             listed as job dependencies for that component)
221 #      2. Copy docker images
222 #      3. Copy git repositories
223 #      4. Copy the pipeline template
224 #
225 #    The only changes made to the copied pipeline instance are:
226 #      1. The original pipeline instance UUID is preserved in
227 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
228 #      2. The pipeline_template_uuid is changed to the new template uuid.
229 #      3. The owner_uuid of the instance is changed to the user who
230 #         copied it.
231 #
232 def copy_pipeline_instance(pi_uuid, src, dst, args):
233     # Fetch the pipeline instance record.
234     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
235
236     if args.recursive:
237         check_git_availability()
238
239         if not args.dst_git_repo:
240             abort('--dst-git-repo is required when copying a pipeline recursively.')
241         # Copy the pipeline template and save the copied template.
242         if pi.get('pipeline_template_uuid', None):
243             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
244                                         src, dst, args)
245
246         # Copy input collections, docker images and git repos.
247         pi = copy_collections(pi, src, dst, args)
248         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
249         copy_docker_images(pi, src, dst, args)
250
251         # Update the fields of the pipeline instance with the copied
252         # pipeline template.
253         if pi.get('pipeline_template_uuid', None):
254             pi['pipeline_template_uuid'] = pt['uuid']
255
256     else:
257         # not recursive
258         logger.info("Copying only pipeline instance %s.", pi_uuid)
259         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
260
261     # Update the pipeline instance properties, and create the new
262     # instance at dst.
263     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
264     pi['description'] = "Pipeline copied from {}\n\n{}".format(
265         pi_uuid,
266         pi['description'] if pi.get('description', None) else '')
267
268     pi['owner_uuid'] = args.project_uuid
269
270     del pi['uuid']
271
272     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
273     return new_pi
274
275 def filter_iter(arg):
276     """Iterate a filter string-or-list.
277
278     Pass in a filter field that can either be a string or list.
279     This will iterate elements as if the field had been written as a list.
280     """
281     if isinstance(arg, basestring):
282         return iter((arg,))
283     else:
284         return iter(arg)
285
286 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
287     """Update a single repository filter in-place for the destination.
288
289     If the filter checks that the repository is src_repository, it is
290     updated to check that the repository is dst_repository.  If it does
291     anything else, this function raises ValueError.
292     """
293     if src_repository is None:
294         raise ValueError("component does not specify a source repository")
295     elif dst_repository is None:
296         raise ValueError("no destination repository specified to update repository filter")
297     elif repo_filter == ['repository', '=', src_repository]:
298         repo_filter[2] = dst_repository
299     elif repo_filter == ['repository', 'in', [src_repository]]:
300         repo_filter[2] = [dst_repository]
301     else:
302         raise ValueError("repository filter is not a simple source match")
303
304 def migrate_script_version_filter(version_filter):
305     """Update a single script_version filter in-place for the destination.
306
307     Currently this function checks that all the filter operands are Git
308     commit hashes.  If they're not, it raises ValueError to indicate that
309     the filter is not portable.  It could be extended to make other
310     transformations in the future.
311     """
312     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
313         raise ValueError("script_version filter is not limited to commit hashes")
314
315 def attr_filtered(filter_, *attr_names):
316     """Return True if filter_ applies to any of attr_names, else False."""
317     return any((name == 'any') or (name in attr_names)
318                for name in filter_iter(filter_[0]))
319
320 @contextlib.contextmanager
321 def exception_handler(handler, *exc_types):
322     """If any exc_types are raised in the block, call handler on the exception."""
323     try:
324         yield
325     except exc_types as error:
326         handler(error)
327
328 def migrate_components_filters(template_components, dst_git_repo):
329     """Update template component filters in-place for the destination.
330
331     template_components is a dictionary of components in a pipeline template.
332     This method walks over each component's filters, and updates them to have
333     identical semantics on the destination cluster.  It returns a list of
334     error strings that describe what filters could not be updated safely.
335
336     dst_git_repo is the name of the destination Git repository, which can
337     be None if that is not known.
338     """
339     errors = []
340     for cname, cspec in template_components.iteritems():
341         def add_error(errmsg):
342             errors.append("{}: {}".format(cname, errmsg))
343         if not isinstance(cspec, dict):
344             add_error("value is not a component definition")
345             continue
346         src_repository = cspec.get('repository')
347         filters = cspec.get('filters', [])
348         if not isinstance(filters, list):
349             add_error("filters are not a list")
350             continue
351         for cfilter in filters:
352             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
353                 add_error("malformed filter {!r}".format(cfilter))
354                 continue
355             if attr_filtered(cfilter, 'repository'):
356                 with exception_handler(add_error, ValueError):
357                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
358             if attr_filtered(cfilter, 'script_version'):
359                 with exception_handler(add_error, ValueError):
360                     migrate_script_version_filter(cfilter)
361     return errors
362
363 # copy_pipeline_template(pt_uuid, src, dst, args)
364 #
365 #    Copies a pipeline template identified by pt_uuid from src to dst.
366 #
367 #    If args.recursive is True, also copy any collections, docker
368 #    images and git repositories that this template references.
369 #
370 #    The owner_uuid of the new template is changed to that of the user
371 #    who copied the template.
372 #
373 #    Returns the copied pipeline template object.
374 #
375 def copy_pipeline_template(pt_uuid, src, dst, args):
376     # fetch the pipeline template from the source instance
377     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
378
379     if not args.force_filters:
380         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
381         if filter_errors:
382             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
383                   "\n".join(filter_errors))
384
385     if args.recursive:
386         check_git_availability()
387
388         if not args.dst_git_repo:
389             abort('--dst-git-repo is required when copying a pipeline recursively.')
390         # Copy input collections, docker images and git repos.
391         pt = copy_collections(pt, src, dst, args)
392         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
393         copy_docker_images(pt, src, dst, args)
394
395     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
396         pt_uuid,
397         pt['description'] if pt.get('description', None) else '')
398     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
399     del pt['uuid']
400
401     pt['owner_uuid'] = args.project_uuid
402
403     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
404
405 # copy_collections(obj, src, dst, args)
406 #
407 #    Recursively copies all collections referenced by 'obj' from src
408 #    to dst.  obj may be a dict or a list, in which case we run
409 #    copy_collections on every value it contains. If it is a string,
410 #    search it for any substring that matches a collection hash or uuid
411 #    (this will find hidden references to collections like
412 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
413 #
414 #    Returns a copy of obj with any old collection uuids replaced by
415 #    the new ones.
416 #
417 def copy_collections(obj, src, dst, args):
418
419     def copy_collection_fn(collection_match):
420         """Helper function for regex substitution: copies a single collection,
421         identified by the collection_match MatchObject, to the
422         destination.  Returns the destination collection uuid (or the
423         portable data hash if that's what src_id is).
424
425         """
426         src_id = collection_match.group(0)
427         if src_id not in collections_copied:
428             dst_col = copy_collection(src_id, src, dst, args)
429             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
430                 collections_copied[src_id] = src_id
431             else:
432                 collections_copied[src_id] = dst_col['uuid']
433         return collections_copied[src_id]
434
435     if isinstance(obj, basestring):
436         # Copy any collections identified in this string to dst, replacing
437         # them with the dst uuids as necessary.
438         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
439         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
440         return obj
441     elif isinstance(obj, dict):
442         return type(obj)((v, copy_collections(obj[v], src, dst, args))
443                          for v in obj)
444     elif isinstance(obj, list):
445         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
446     return obj
447
448 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
449     """Copy a job's script to the destination repository, and update its record.
450
451     Given a jobspec dictionary, this function finds the referenced script from
452     src and copies it to dst and dst_repo.  It also updates jobspec in place to
453     refer to names on the destination.
454     """
455     repo = jobspec.get('repository')
456     if repo is None:
457         return
458     # script_version is the "script_version" parameter from the source
459     # component or job.  If no script_version was supplied in the
460     # component or job, it is a mistake in the pipeline, but for the
461     # purposes of copying the repository, default to "master".
462     script_version = jobspec.get('script_version') or 'master'
463     script_key = (repo, script_version)
464     if script_key not in scripts_copied:
465         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
466         scripts_copied.add(script_key)
467     jobspec['repository'] = dst_repo
468     repo_dir = local_repo_dir[repo]
469     for version_key in ['script_version', 'supplied_script_version']:
470         if version_key in jobspec:
471             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
472
473 # copy_git_repos(p, src, dst, dst_repo, args)
474 #
475 #    Copies all git repositories referenced by pipeline instance or
476 #    template 'p' from src to dst.
477 #
478 #    For each component c in the pipeline:
479 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
480 #      * Rename script versions:
481 #          * c['script_version']
482 #          * c['job']['script_version']
483 #          * c['job']['supplied_script_version']
484 #        to the commit hashes they resolve to, since any symbolic
485 #        names (tags, branches) are not preserved in the destination repo.
486 #
487 #    The pipeline object is updated in place with the new repository
488 #    names.  The return value is undefined.
489 #
490 def copy_git_repos(p, src, dst, dst_repo, args):
491     for component in p['components'].itervalues():
492         migrate_jobspec(component, src, dst, dst_repo, args)
493         if 'job' in component:
494             migrate_jobspec(component['job'], src, dst, dst_repo, args)
495
496 def total_collection_size(manifest_text):
497     """Return the total number of bytes in this collection (excluding
498     duplicate blocks)."""
499
500     total_bytes = 0
501     locators_seen = {}
502     for line in manifest_text.splitlines():
503         words = line.split()
504         for word in words[1:]:
505             try:
506                 loc = arvados.KeepLocator(word)
507             except ValueError:
508                 continue  # this word isn't a locator, skip it
509             if loc.md5sum not in locators_seen:
510                 locators_seen[loc.md5sum] = True
511                 total_bytes += loc.size
512
513     return total_bytes
514
515 def create_collection_from(c, src, dst, args):
516     """Create a new collection record on dst, and copy Docker metadata if
517     available."""
518
519     collection_uuid = c['uuid']
520     del c['uuid']
521
522     if not c["name"]:
523         c['name'] = "copied from " + collection_uuid
524
525     if 'properties' in c:
526         del c['properties']
527
528     c['owner_uuid'] = args.project_uuid
529
530     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
531
532     # Create docker_image_repo+tag and docker_image_hash links
533     # at the destination.
534     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
535         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
536
537         for src_link in docker_links:
538             body = {key: src_link[key]
539                     for key in ['link_class', 'name', 'properties']}
540             body['head_uuid'] = dst_collection['uuid']
541             body['owner_uuid'] = args.project_uuid
542
543             lk = dst.links().create(body=body).execute(num_retries=args.retries)
544             logger.debug('created dst link {}'.format(lk))
545
546     return dst_collection
547
548 # copy_collection(obj_uuid, src, dst, args)
549 #
550 #    Copies the collection identified by obj_uuid from src to dst.
551 #    Returns the collection object created at dst.
552 #
553 #    If args.progress is True, produce a human-friendly progress
554 #    report.
555 #
556 #    If a collection with the desired portable_data_hash already
557 #    exists at dst, and args.force is False, copy_collection returns
558 #    the existing collection without copying any blocks.  Otherwise
559 #    (if no collection exists or if args.force is True)
560 #    copy_collection copies all of the collection data blocks from src
561 #    to dst.
562 #
563 #    For this application, it is critical to preserve the
564 #    collection's manifest hash, which is not guaranteed with the
565 #    arvados.CollectionReader and arvados.CollectionWriter classes.
566 #    Copying each block in the collection manually, followed by
567 #    the manifest block, ensures that the collection's manifest
568 #    hash will not change.
569 #
570 def copy_collection(obj_uuid, src, dst, args):
571     if arvados.util.keep_locator_pattern.match(obj_uuid):
572         # If the obj_uuid is a portable data hash, it might not be uniquely
573         # identified with a particular collection.  As a result, it is
574         # ambigious as to what name to use for the copy.  Apply some heuristics
575         # to pick which collection to get the name from.
576         srccol = src.collections().list(
577             filters=[['portable_data_hash', '=', obj_uuid]],
578             order="created_at asc"
579             ).execute(num_retries=args.retries)
580
581         items = srccol.get("items")
582
583         if not items:
584             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
585             return
586
587         c = None
588
589         if len(items) == 1:
590             # There's only one collection with the PDH, so use that.
591             c = items[0]
592         if not c:
593             # See if there is a collection that's in the same project
594             # as the root item (usually a pipeline) being copied.
595             for i in items:
596                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
597                     c = i
598                     break
599         if not c:
600             # Didn't find any collections located in the same project, so
601             # pick the oldest collection that has a name assigned to it.
602             for i in items:
603                 if i.get("name"):
604                     c = i
605                     break
606         if not c:
607             # None of the collections have names (?!), so just pick the
608             # first one.
609             c = items[0]
610
611         # list() doesn't return manifest text (and we don't want it to,
612         # because we don't need the same maninfest text sent to us 50
613         # times) so go and retrieve the collection object directly
614         # which will include the manifest text.
615         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
616     else:
617         # Assume this is an actual collection uuid, so fetch it directly.
618         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
619
620     # If a collection with this hash already exists at the
621     # destination, and 'force' is not true, just return that
622     # collection.
623     if not args.force:
624         if 'portable_data_hash' in c:
625             colhash = c['portable_data_hash']
626         else:
627             colhash = c['uuid']
628         dstcol = dst.collections().list(
629             filters=[['portable_data_hash', '=', colhash]]
630         ).execute(num_retries=args.retries)
631         if dstcol['items_available'] > 0:
632             for d in dstcol['items']:
633                 if ((args.project_uuid == d['owner_uuid']) and
634                     (c.get('name') == d['name']) and
635                     (c['portable_data_hash'] == d['portable_data_hash'])):
636                     return d
637             c['manifest_text'] = dst.collections().get(
638                 uuid=dstcol['items'][0]['uuid']
639             ).execute(num_retries=args.retries)['manifest_text']
640             return create_collection_from(c, src, dst, args)
641
642     # Fetch the collection's manifest.
643     manifest = c['manifest_text']
644     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
645
646     # Copy each block from src_keep to dst_keep.
647     # Use the newly signed locators returned from dst_keep to build
648     # a new manifest as we go.
649     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
650     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
651     dst_manifest = ""
652     dst_locators = {}
653     bytes_written = 0
654     bytes_expected = total_collection_size(manifest)
655     if args.progress:
656         progress_writer = ProgressWriter(human_progress)
657     else:
658         progress_writer = None
659
660     for line in manifest.splitlines():
661         words = line.split()
662         dst_manifest += words[0]
663         for word in words[1:]:
664             try:
665                 loc = arvados.KeepLocator(word)
666             except ValueError:
667                 # If 'word' can't be parsed as a locator,
668                 # presume it's a filename.
669                 dst_manifest += ' ' + word
670                 continue
671             blockhash = loc.md5sum
672             # copy this block if we haven't seen it before
673             # (otherwise, just reuse the existing dst_locator)
674             if blockhash not in dst_locators:
675                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
676                 if progress_writer:
677                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
678                 data = src_keep.get(word)
679                 dst_locator = dst_keep.put(data)
680                 dst_locators[blockhash] = dst_locator
681                 bytes_written += loc.size
682             dst_manifest += ' ' + dst_locators[blockhash]
683         dst_manifest += "\n"
684
685     if progress_writer:
686         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
687         progress_writer.finish()
688
689     # Copy the manifest and save the collection.
690     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
691
692     c['manifest_text'] = dst_manifest
693     return create_collection_from(c, src, dst, args)
694
695 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
696     r = api.repositories().list(
697         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
698     if r['items_available'] != 1:
699         raise Exception('cannot identify repo {}; {} repos found'
700                         .format(repo_name, r['items_available']))
701
702     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
703     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
704     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
705
706     priority = https_url + other_url + http_url
707
708     git_config = []
709     git_url = None
710     for url in priority:
711         if url.startswith("http"):
712             u = urlparse.urlsplit(url)
713             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
714             git_config = ["-c", "credential.%s/.username=none" % baseurl,
715                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
716         else:
717             git_config = []
718
719         try:
720             logger.debug("trying %s", url)
721             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
722                                       env={"HOME": os.environ["HOME"],
723                                            "ARVADOS_API_TOKEN": api.api_token,
724                                            "GIT_ASKPASS": "/bin/false"})
725         except arvados.errors.CommandFailedError:
726             pass
727         else:
728             git_url = url
729             break
730
731     if not git_url:
732         raise Exception('Cannot access git repository, tried {}'
733                         .format(priority))
734
735     if git_url.startswith("http:"):
736         if allow_insecure_http:
737             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
738         else:
739             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
740
741     return (git_url, git_config)
742
743
744 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
745 #
746 #    Copies commits from git repository 'src_git_repo' on Arvados
747 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
748 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
749 #    or "jsmith")
750 #
751 #    All commits will be copied to a destination branch named for the
752 #    source repository URL.
753 #
754 #    The destination repository must already exist.
755 #
756 #    The user running this command must be authenticated
757 #    to both repositories.
758 #
759 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
760     # Identify the fetch and push URLs for the git repositories.
761
762     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
763     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
764
765     logger.debug('src_git_url: {}'.format(src_git_url))
766     logger.debug('dst_git_url: {}'.format(dst_git_url))
767
768     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
769
770     # Copy git commits from src repo to dst repo.
771     if src_git_repo not in local_repo_dir:
772         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
773         arvados.util.run_command(
774             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
775              local_repo_dir[src_git_repo]],
776             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
777             env={"HOME": os.environ["HOME"],
778                  "ARVADOS_API_TOKEN": src.api_token,
779                  "GIT_ASKPASS": "/bin/false"})
780         arvados.util.run_command(
781             ["git", "remote", "add", "dst", dst_git_url],
782             cwd=local_repo_dir[src_git_repo])
783     arvados.util.run_command(
784         ["git", "branch", dst_branch, script_version],
785         cwd=local_repo_dir[src_git_repo])
786     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
787                              cwd=local_repo_dir[src_git_repo],
788                              env={"HOME": os.environ["HOME"],
789                                   "ARVADOS_API_TOKEN": dst.api_token,
790                                   "GIT_ASKPASS": "/bin/false"})
791
792 def copy_docker_images(pipeline, src, dst, args):
793     """Copy any docker images named in the pipeline components'
794     runtime_constraints field from src to dst."""
795
796     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
797     for c_name, c_info in pipeline['components'].iteritems():
798         if ('runtime_constraints' in c_info and
799             'docker_image' in c_info['runtime_constraints']):
800             copy_docker_image(
801                 c_info['runtime_constraints']['docker_image'],
802                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
803                 src, dst, args)
804
805
806 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
807     """Copy the docker image identified by docker_image and
808     docker_image_tag from src to dst. Create appropriate
809     docker_image_repo+tag and docker_image_hash links at dst.
810
811     """
812
813     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
814
815     # Find the link identifying this docker image.
816     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
817         src, args.retries, docker_image, docker_image_tag)
818     if docker_image_list:
819         image_uuid, image_info = docker_image_list[0]
820         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
821
822         # Copy the collection it refers to.
823         dst_image_col = copy_collection(image_uuid, src, dst, args)
824     elif arvados.util.keep_locator_pattern.match(docker_image):
825         dst_image_col = copy_collection(docker_image, src, dst, args)
826     else:
827         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
828
829 # git_rev_parse(rev, repo)
830 #
831 #    Returns the 40-character commit hash corresponding to 'rev' in
832 #    git repository 'repo' (which must be the path of a local git
833 #    repository)
834 #
835 def git_rev_parse(rev, repo):
836     gitout, giterr = arvados.util.run_command(
837         ['git', 'rev-parse', rev], cwd=repo)
838     return gitout.strip()
839
840 # uuid_type(api, object_uuid)
841 #
842 #    Returns the name of the class that object_uuid belongs to, based on
843 #    the second field of the uuid.  This function consults the api's
844 #    schema to identify the object class.
845 #
846 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
847 #
848 #    Special case: if handed a Keep locator hash, return 'Collection'.
849 #
850 def uuid_type(api, object_uuid):
851     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
852         return 'Collection'
853     p = object_uuid.split('-')
854     if len(p) == 3:
855         type_prefix = p[1]
856         for k in api._schema.schemas:
857             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
858             if type_prefix == obj_class:
859                 return k
860     return None
861
862 def abort(msg, code=1):
863     logger.info("arv-copy: %s", msg)
864     exit(code)
865
866
867 # Code for reporting on the progress of a collection upload.
868 # Stolen from arvados.commands.put.ArvPutCollectionWriter
869 # TODO(twp): figure out how to refactor into a shared library
870 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
871 # code)
872
873 def machine_progress(obj_uuid, bytes_written, bytes_expected):
874     return "{} {}: {} {} written {} total\n".format(
875         sys.argv[0],
876         os.getpid(),
877         obj_uuid,
878         bytes_written,
879         -1 if (bytes_expected is None) else bytes_expected)
880
881 def human_progress(obj_uuid, bytes_written, bytes_expected):
882     if bytes_expected:
883         return "\r{}: {}M / {}M {:.1%} ".format(
884             obj_uuid,
885             bytes_written >> 20, bytes_expected >> 20,
886             float(bytes_written) / bytes_expected)
887     else:
888         return "\r{}: {} ".format(obj_uuid, bytes_written)
889
890 class ProgressWriter(object):
891     _progress_func = None
892     outfile = sys.stderr
893
894     def __init__(self, progress_func):
895         self._progress_func = progress_func
896
897     def report(self, obj_uuid, bytes_written, bytes_expected):
898         if self._progress_func is not None:
899             self.outfile.write(
900                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
901
902     def finish(self):
903         self.outfile.write("\n")
904
905 if __name__ == '__main__':
906     main()