closes #10110
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import contextlib
21 import getpass
22 import os
23 import re
24 import shutil
25 import sys
26 import logging
27 import tempfile
28 import urlparse
29
30 import arvados
31 import arvados.config
32 import arvados.keep
33 import arvados.util
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
36 import ruamel.yaml as yaml
37
38 from arvados.api import OrderedJsonModel
39 from arvados._version import __version__
40
41 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
42
43 logger = logging.getLogger('arvados.arv-copy')
44
45 # local_repo_dir records which git repositories from the Arvados source
46 # instance have been checked out locally during this run, and to which
47 # directories.
48 # e.g. if repository 'twp' from src_arv has been cloned into
49 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
50 #
51 local_repo_dir = {}
52
53 # List of collections that have been copied in this session, and their
54 # destination collection UUIDs.
55 collections_copied = {}
56
57 # Set of (repository, script_version) two-tuples of commits copied in git.
58 scripts_copied = set()
59
60 # The owner_uuid of the object being copied
61 src_owner_uuid = None
62
63 def main():
64     copy_opts = argparse.ArgumentParser(add_help=False)
65
66     copy_opts.add_argument(
67         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
68         help='Print version and exit.')
69     copy_opts.add_argument(
70         '-v', '--verbose', dest='verbose', action='store_true',
71         help='Verbose output.')
72     copy_opts.add_argument(
73         '--progress', dest='progress', action='store_true',
74         help='Report progress on copying collections. (default)')
75     copy_opts.add_argument(
76         '--no-progress', dest='progress', action='store_false',
77         help='Do not report progress on copying collections.')
78     copy_opts.add_argument(
79         '-f', '--force', dest='force', action='store_true',
80         help='Perform copy even if the object appears to exist at the remote destination.')
81     copy_opts.add_argument(
82         '--force-filters', action='store_true', default=False,
83         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
84     copy_opts.add_argument(
85         '--src', dest='source_arvados', required=True,
86         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
87     copy_opts.add_argument(
88         '--dst', dest='destination_arvados', required=True,
89         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
90     copy_opts.add_argument(
91         '--recursive', dest='recursive', action='store_true',
92         help='Recursively copy any dependencies for this object. (default)')
93     copy_opts.add_argument(
94         '--no-recursive', dest='recursive', action='store_false',
95         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
96     copy_opts.add_argument(
97         '--dst-git-repo', dest='dst_git_repo',
98         help='The name of the destination git repository. Required when copying a pipeline recursively.')
99     copy_opts.add_argument(
100         '--project-uuid', dest='project_uuid',
101         help='The UUID of the project at the destination to which the pipeline should be copied.')
102     copy_opts.add_argument(
103         '--allow-git-http-src', action="store_true",
104         help='Allow cloning git repositories over insecure http')
105     copy_opts.add_argument(
106         '--allow-git-http-dst', action="store_true",
107         help='Allow pushing git repositories over insecure http')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.verbose:
121         logger.setLevel(logging.DEBUG)
122     else:
123         logger.setLevel(logging.INFO)
124
125     # Create API clients for the source and destination instances
126     src_arv = api_for_instance(args.source_arvados)
127     dst_arv = api_for_instance(args.destination_arvados)
128
129     if not args.project_uuid:
130         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
131
132     # Identify the kind of object we have been given, and begin copying.
133     t = uuid_type(src_arv, args.object_uuid)
134     if t == 'Collection':
135         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
136         result = copy_collection(args.object_uuid,
137                                  src_arv, dst_arv,
138                                  args)
139     elif t == 'PipelineInstance':
140         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
141         result = copy_pipeline_instance(args.object_uuid,
142                                         src_arv, dst_arv,
143                                         args)
144     elif t == 'PipelineTemplate':
145         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
146         result = copy_pipeline_template(args.object_uuid,
147                                         src_arv, dst_arv, args)
148     elif t == 'Workflow':
149         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
150         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in local_repo_dir.values():
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     logger.info("")
165     logger.info("Success: created copy with uuid {}".format(result['uuid']))
166     exit(0)
167
168 def set_src_owner_uuid(resource, uuid, args):
169     global src_owner_uuid
170     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
171     src_owner_uuid = c.get("owner_uuid")
172
173 # api_for_instance(instance_name)
174 #
175 #     Creates an API client for the Arvados instance identified by
176 #     instance_name.
177 #
178 #     If instance_name contains a slash, it is presumed to be a path
179 #     (either local or absolute) to a file with Arvados configuration
180 #     settings.
181 #
182 #     Otherwise, it is presumed to be the name of a file in
183 #     $HOME/.config/arvados/instance_name.conf
184 #
185 def api_for_instance(instance_name):
186     if '/' in instance_name:
187         config_file = instance_name
188     else:
189         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
190
191     try:
192         cfg = arvados.config.load(config_file)
193     except (IOError, OSError) as e:
194         abort(("Could not open config file {}: {}\n" +
195                "You must make sure that your configuration tokens\n" +
196                "for Arvados instance {} are in {} and that this\n" +
197                "file is readable.").format(
198                    config_file, e, instance_name, config_file))
199
200     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
201         api_is_insecure = (
202             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
203                 ['1', 't', 'true', 'y', 'yes']))
204         client = arvados.api('v1',
205                              host=cfg['ARVADOS_API_HOST'],
206                              token=cfg['ARVADOS_API_TOKEN'],
207                              insecure=api_is_insecure,
208                              model=OrderedJsonModel())
209     else:
210         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
211     return client
212
213 # Check if git is available
214 def check_git_availability():
215     try:
216         arvados.util.run_command(['git', '--help'])
217     except Exception:
218         abort('git command is not available. Please ensure git is installed.')
219
220 # copy_pipeline_instance(pi_uuid, src, dst, args)
221 #
222 #    Copies a pipeline instance identified by pi_uuid from src to dst.
223 #
224 #    If the args.recursive option is set:
225 #      1. Copies all input collections
226 #           * For each component in the pipeline, include all collections
227 #             listed as job dependencies for that component)
228 #      2. Copy docker images
229 #      3. Copy git repositories
230 #      4. Copy the pipeline template
231 #
232 #    The only changes made to the copied pipeline instance are:
233 #      1. The original pipeline instance UUID is preserved in
234 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
235 #      2. The pipeline_template_uuid is changed to the new template uuid.
236 #      3. The owner_uuid of the instance is changed to the user who
237 #         copied it.
238 #
239 def copy_pipeline_instance(pi_uuid, src, dst, args):
240     # Fetch the pipeline instance record.
241     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
242
243     if args.recursive:
244         check_git_availability()
245
246         if not args.dst_git_repo:
247             abort('--dst-git-repo is required when copying a pipeline recursively.')
248         # Copy the pipeline template and save the copied template.
249         if pi.get('pipeline_template_uuid', None):
250             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
251                                         src, dst, args)
252
253         # Copy input collections, docker images and git repos.
254         pi = copy_collections(pi, src, dst, args)
255         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
256         copy_docker_images(pi, src, dst, args)
257
258         # Update the fields of the pipeline instance with the copied
259         # pipeline template.
260         if pi.get('pipeline_template_uuid', None):
261             pi['pipeline_template_uuid'] = pt['uuid']
262
263     else:
264         # not recursive
265         logger.info("Copying only pipeline instance %s.", pi_uuid)
266         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
267
268     # Update the pipeline instance properties, and create the new
269     # instance at dst.
270     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
271     pi['description'] = "Pipeline copied from {}\n\n{}".format(
272         pi_uuid,
273         pi['description'] if pi.get('description', None) else '')
274
275     pi['owner_uuid'] = args.project_uuid
276
277     del pi['uuid']
278
279     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
280     return new_pi
281
282 def filter_iter(arg):
283     """Iterate a filter string-or-list.
284
285     Pass in a filter field that can either be a string or list.
286     This will iterate elements as if the field had been written as a list.
287     """
288     if isinstance(arg, basestring):
289         return iter((arg,))
290     else:
291         return iter(arg)
292
293 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
294     """Update a single repository filter in-place for the destination.
295
296     If the filter checks that the repository is src_repository, it is
297     updated to check that the repository is dst_repository.  If it does
298     anything else, this function raises ValueError.
299     """
300     if src_repository is None:
301         raise ValueError("component does not specify a source repository")
302     elif dst_repository is None:
303         raise ValueError("no destination repository specified to update repository filter")
304     elif repo_filter[1:] == ['=', src_repository]:
305         repo_filter[2] = dst_repository
306     elif repo_filter[1:] == ['in', [src_repository]]:
307         repo_filter[2] = [dst_repository]
308     else:
309         raise ValueError("repository filter is not a simple source match")
310
311 def migrate_script_version_filter(version_filter):
312     """Update a single script_version filter in-place for the destination.
313
314     Currently this function checks that all the filter operands are Git
315     commit hashes.  If they're not, it raises ValueError to indicate that
316     the filter is not portable.  It could be extended to make other
317     transformations in the future.
318     """
319     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
320         raise ValueError("script_version filter is not limited to commit hashes")
321
322 def attr_filtered(filter_, *attr_names):
323     """Return True if filter_ applies to any of attr_names, else False."""
324     return any((name == 'any') or (name in attr_names)
325                for name in filter_iter(filter_[0]))
326
327 @contextlib.contextmanager
328 def exception_handler(handler, *exc_types):
329     """If any exc_types are raised in the block, call handler on the exception."""
330     try:
331         yield
332     except exc_types as error:
333         handler(error)
334
335 def migrate_components_filters(template_components, dst_git_repo):
336     """Update template component filters in-place for the destination.
337
338     template_components is a dictionary of components in a pipeline template.
339     This method walks over each component's filters, and updates them to have
340     identical semantics on the destination cluster.  It returns a list of
341     error strings that describe what filters could not be updated safely.
342
343     dst_git_repo is the name of the destination Git repository, which can
344     be None if that is not known.
345     """
346     errors = []
347     for cname, cspec in template_components.iteritems():
348         def add_error(errmsg):
349             errors.append("{}: {}".format(cname, errmsg))
350         if not isinstance(cspec, dict):
351             add_error("value is not a component definition")
352             continue
353         src_repository = cspec.get('repository')
354         filters = cspec.get('filters', [])
355         if not isinstance(filters, list):
356             add_error("filters are not a list")
357             continue
358         for cfilter in filters:
359             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
360                 add_error("malformed filter {!r}".format(cfilter))
361                 continue
362             if attr_filtered(cfilter, 'repository'):
363                 with exception_handler(add_error, ValueError):
364                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
365             if attr_filtered(cfilter, 'script_version'):
366                 with exception_handler(add_error, ValueError):
367                     migrate_script_version_filter(cfilter)
368     return errors
369
370 # copy_pipeline_template(pt_uuid, src, dst, args)
371 #
372 #    Copies a pipeline template identified by pt_uuid from src to dst.
373 #
374 #    If args.recursive is True, also copy any collections, docker
375 #    images and git repositories that this template references.
376 #
377 #    The owner_uuid of the new template is changed to that of the user
378 #    who copied the template.
379 #
380 #    Returns the copied pipeline template object.
381 #
382 def copy_pipeline_template(pt_uuid, src, dst, args):
383     # fetch the pipeline template from the source instance
384     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
385
386     if not args.force_filters:
387         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
388         if filter_errors:
389             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
390                   "\n".join(filter_errors))
391
392     if args.recursive:
393         check_git_availability()
394
395         if not args.dst_git_repo:
396             abort('--dst-git-repo is required when copying a pipeline recursively.')
397         # Copy input collections, docker images and git repos.
398         pt = copy_collections(pt, src, dst, args)
399         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
400         copy_docker_images(pt, src, dst, args)
401
402     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
403         pt_uuid,
404         pt['description'] if pt.get('description', None) else '')
405     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
406     del pt['uuid']
407
408     pt['owner_uuid'] = args.project_uuid
409
410     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
411
412 # copy_workflow(wf_uuid, src, dst, args)
413 #
414 #    Copies a workflow identified by wf_uuid from src to dst.
415 #
416 #    If args.recursive is True, also copy any collections
417 #      referenced in the workflow definition yaml.
418 #
419 #    The owner_uuid of the new workflow is set to any given
420 #      project_uuid or the user who copied the template.
421 #
422 #    Returns the copied workflow object.
423 #
424 def copy_workflow(wf_uuid, src, dst, args):
425     # fetch the workflow from the source instance
426     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
427
428     # copy collections and docker images
429     if args.recursive:
430         wf_def = yaml.safe_load(wf["definition"])
431         if wf_def is not None:
432             locations = []
433             docker_images = {}
434             graph = wf_def.get('$graph', None)
435             if graph is not None:
436                 workflow_collections(graph, locations, docker_images)
437             else:
438                 workflow_collections(wf_def, locations, docker_images)
439
440             if locations:
441                 copy_collections(locations, src, dst, args)
442
443             for image in docker_images:
444                 copy_docker_image(image, docker_images[image], src, dst, args)
445
446     # copy the workflow itself
447     del wf['uuid']
448     wf['owner_uuid'] = args.project_uuid
449     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
450
451 def workflow_collections(obj, locations, docker_images):
452     if isinstance(obj, dict):
453         loc = obj.get('location', None)
454         if loc is not None:
455             if loc.startswith("keep:"):
456                 locations.append(loc[5:])
457
458         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
459         if docker_image is not None:
460             ds = docker_image.split(":", 1)
461             tag = ds[1] if len(ds)==2 else 'latest'
462             docker_images[ds[0]] = tag
463
464         for x in obj:
465             workflow_collections(obj[x], locations, docker_images)
466     elif isinstance(obj, list):
467         for x in obj:
468             workflow_collections(x, locations, docker_images)
469
470 # copy_collections(obj, src, dst, args)
471 #
472 #    Recursively copies all collections referenced by 'obj' from src
473 #    to dst.  obj may be a dict or a list, in which case we run
474 #    copy_collections on every value it contains. If it is a string,
475 #    search it for any substring that matches a collection hash or uuid
476 #    (this will find hidden references to collections like
477 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
478 #
479 #    Returns a copy of obj with any old collection uuids replaced by
480 #    the new ones.
481 #
482 def copy_collections(obj, src, dst, args):
483
484     def copy_collection_fn(collection_match):
485         """Helper function for regex substitution: copies a single collection,
486         identified by the collection_match MatchObject, to the
487         destination.  Returns the destination collection uuid (or the
488         portable data hash if that's what src_id is).
489
490         """
491         src_id = collection_match.group(0)
492         if src_id not in collections_copied:
493             dst_col = copy_collection(src_id, src, dst, args)
494             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
495                 collections_copied[src_id] = src_id
496             else:
497                 collections_copied[src_id] = dst_col['uuid']
498         return collections_copied[src_id]
499
500     if isinstance(obj, basestring):
501         # Copy any collections identified in this string to dst, replacing
502         # them with the dst uuids as necessary.
503         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
504         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
505         return obj
506     elif isinstance(obj, dict):
507         return type(obj)((v, copy_collections(obj[v], src, dst, args))
508                          for v in obj)
509     elif isinstance(obj, list):
510         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
511     return obj
512
513 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
514     """Copy a job's script to the destination repository, and update its record.
515
516     Given a jobspec dictionary, this function finds the referenced script from
517     src and copies it to dst and dst_repo.  It also updates jobspec in place to
518     refer to names on the destination.
519     """
520     repo = jobspec.get('repository')
521     if repo is None:
522         return
523     # script_version is the "script_version" parameter from the source
524     # component or job.  If no script_version was supplied in the
525     # component or job, it is a mistake in the pipeline, but for the
526     # purposes of copying the repository, default to "master".
527     script_version = jobspec.get('script_version') or 'master'
528     script_key = (repo, script_version)
529     if script_key not in scripts_copied:
530         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
531         scripts_copied.add(script_key)
532     jobspec['repository'] = dst_repo
533     repo_dir = local_repo_dir[repo]
534     for version_key in ['script_version', 'supplied_script_version']:
535         if version_key in jobspec:
536             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
537
538 # copy_git_repos(p, src, dst, dst_repo, args)
539 #
540 #    Copies all git repositories referenced by pipeline instance or
541 #    template 'p' from src to dst.
542 #
543 #    For each component c in the pipeline:
544 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
545 #      * Rename script versions:
546 #          * c['script_version']
547 #          * c['job']['script_version']
548 #          * c['job']['supplied_script_version']
549 #        to the commit hashes they resolve to, since any symbolic
550 #        names (tags, branches) are not preserved in the destination repo.
551 #
552 #    The pipeline object is updated in place with the new repository
553 #    names.  The return value is undefined.
554 #
555 def copy_git_repos(p, src, dst, dst_repo, args):
556     for component in p['components'].itervalues():
557         migrate_jobspec(component, src, dst, dst_repo, args)
558         if 'job' in component:
559             migrate_jobspec(component['job'], src, dst, dst_repo, args)
560
561 def total_collection_size(manifest_text):
562     """Return the total number of bytes in this collection (excluding
563     duplicate blocks)."""
564
565     total_bytes = 0
566     locators_seen = {}
567     for line in manifest_text.splitlines():
568         words = line.split()
569         for word in words[1:]:
570             try:
571                 loc = arvados.KeepLocator(word)
572             except ValueError:
573                 continue  # this word isn't a locator, skip it
574             if loc.md5sum not in locators_seen:
575                 locators_seen[loc.md5sum] = True
576                 total_bytes += loc.size
577
578     return total_bytes
579
580 def create_collection_from(c, src, dst, args):
581     """Create a new collection record on dst, and copy Docker metadata if
582     available."""
583
584     collection_uuid = c['uuid']
585     del c['uuid']
586
587     if not c["name"]:
588         c['name'] = "copied from " + collection_uuid
589
590     if 'properties' in c:
591         del c['properties']
592
593     c['owner_uuid'] = args.project_uuid
594
595     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
596
597     # Create docker_image_repo+tag and docker_image_hash links
598     # at the destination.
599     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
600         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
601
602         for src_link in docker_links:
603             body = {key: src_link[key]
604                     for key in ['link_class', 'name', 'properties']}
605             body['head_uuid'] = dst_collection['uuid']
606             body['owner_uuid'] = args.project_uuid
607
608             lk = dst.links().create(body=body).execute(num_retries=args.retries)
609             logger.debug('created dst link {}'.format(lk))
610
611     return dst_collection
612
613 # copy_collection(obj_uuid, src, dst, args)
614 #
615 #    Copies the collection identified by obj_uuid from src to dst.
616 #    Returns the collection object created at dst.
617 #
618 #    If args.progress is True, produce a human-friendly progress
619 #    report.
620 #
621 #    If a collection with the desired portable_data_hash already
622 #    exists at dst, and args.force is False, copy_collection returns
623 #    the existing collection without copying any blocks.  Otherwise
624 #    (if no collection exists or if args.force is True)
625 #    copy_collection copies all of the collection data blocks from src
626 #    to dst.
627 #
628 #    For this application, it is critical to preserve the
629 #    collection's manifest hash, which is not guaranteed with the
630 #    arvados.CollectionReader and arvados.CollectionWriter classes.
631 #    Copying each block in the collection manually, followed by
632 #    the manifest block, ensures that the collection's manifest
633 #    hash will not change.
634 #
635 def copy_collection(obj_uuid, src, dst, args):
636     if arvados.util.keep_locator_pattern.match(obj_uuid):
637         # If the obj_uuid is a portable data hash, it might not be uniquely
638         # identified with a particular collection.  As a result, it is
639         # ambigious as to what name to use for the copy.  Apply some heuristics
640         # to pick which collection to get the name from.
641         srccol = src.collections().list(
642             filters=[['portable_data_hash', '=', obj_uuid]],
643             order="created_at asc"
644             ).execute(num_retries=args.retries)
645
646         items = srccol.get("items")
647
648         if not items:
649             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
650             return
651
652         c = None
653
654         if len(items) == 1:
655             # There's only one collection with the PDH, so use that.
656             c = items[0]
657         if not c:
658             # See if there is a collection that's in the same project
659             # as the root item (usually a pipeline) being copied.
660             for i in items:
661                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
662                     c = i
663                     break
664         if not c:
665             # Didn't find any collections located in the same project, so
666             # pick the oldest collection that has a name assigned to it.
667             for i in items:
668                 if i.get("name"):
669                     c = i
670                     break
671         if not c:
672             # None of the collections have names (?!), so just pick the
673             # first one.
674             c = items[0]
675
676         # list() doesn't return manifest text (and we don't want it to,
677         # because we don't need the same maninfest text sent to us 50
678         # times) so go and retrieve the collection object directly
679         # which will include the manifest text.
680         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
681     else:
682         # Assume this is an actual collection uuid, so fetch it directly.
683         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
684
685     # If a collection with this hash already exists at the
686     # destination, and 'force' is not true, just return that
687     # collection.
688     if not args.force:
689         if 'portable_data_hash' in c:
690             colhash = c['portable_data_hash']
691         else:
692             colhash = c['uuid']
693         dstcol = dst.collections().list(
694             filters=[['portable_data_hash', '=', colhash]]
695         ).execute(num_retries=args.retries)
696         if dstcol['items_available'] > 0:
697             for d in dstcol['items']:
698                 if ((args.project_uuid == d['owner_uuid']) and
699                     (c.get('name') == d['name']) and
700                     (c['portable_data_hash'] == d['portable_data_hash'])):
701                     return d
702             c['manifest_text'] = dst.collections().get(
703                 uuid=dstcol['items'][0]['uuid']
704             ).execute(num_retries=args.retries)['manifest_text']
705             return create_collection_from(c, src, dst, args)
706
707     # Fetch the collection's manifest.
708     manifest = c['manifest_text']
709     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
710
711     # Copy each block from src_keep to dst_keep.
712     # Use the newly signed locators returned from dst_keep to build
713     # a new manifest as we go.
714     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
715     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
716     dst_manifest = ""
717     dst_locators = {}
718     bytes_written = 0
719     bytes_expected = total_collection_size(manifest)
720     if args.progress:
721         progress_writer = ProgressWriter(human_progress)
722     else:
723         progress_writer = None
724
725     for line in manifest.splitlines():
726         words = line.split()
727         dst_manifest += words[0]
728         for word in words[1:]:
729             try:
730                 loc = arvados.KeepLocator(word)
731             except ValueError:
732                 # If 'word' can't be parsed as a locator,
733                 # presume it's a filename.
734                 dst_manifest += ' ' + word
735                 continue
736             blockhash = loc.md5sum
737             # copy this block if we haven't seen it before
738             # (otherwise, just reuse the existing dst_locator)
739             if blockhash not in dst_locators:
740                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
741                 if progress_writer:
742                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
743                 data = src_keep.get(word)
744                 dst_locator = dst_keep.put(data)
745                 dst_locators[blockhash] = dst_locator
746                 bytes_written += loc.size
747             dst_manifest += ' ' + dst_locators[blockhash]
748         dst_manifest += "\n"
749
750     if progress_writer:
751         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
752         progress_writer.finish()
753
754     # Copy the manifest and save the collection.
755     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
756
757     c['manifest_text'] = dst_manifest
758     return create_collection_from(c, src, dst, args)
759
760 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
761     r = api.repositories().list(
762         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
763     if r['items_available'] != 1:
764         raise Exception('cannot identify repo {}; {} repos found'
765                         .format(repo_name, r['items_available']))
766
767     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
768     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
769     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
770
771     priority = https_url + other_url + http_url
772
773     git_config = []
774     git_url = None
775     for url in priority:
776         if url.startswith("http"):
777             u = urlparse.urlsplit(url)
778             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
779             git_config = ["-c", "credential.%s/.username=none" % baseurl,
780                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
781         else:
782             git_config = []
783
784         try:
785             logger.debug("trying %s", url)
786             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
787                                       env={"HOME": os.environ["HOME"],
788                                            "ARVADOS_API_TOKEN": api.api_token,
789                                            "GIT_ASKPASS": "/bin/false"})
790         except arvados.errors.CommandFailedError:
791             pass
792         else:
793             git_url = url
794             break
795
796     if not git_url:
797         raise Exception('Cannot access git repository, tried {}'
798                         .format(priority))
799
800     if git_url.startswith("http:"):
801         if allow_insecure_http:
802             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
803         else:
804             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
805
806     return (git_url, git_config)
807
808
809 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
810 #
811 #    Copies commits from git repository 'src_git_repo' on Arvados
812 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
813 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
814 #    or "jsmith")
815 #
816 #    All commits will be copied to a destination branch named for the
817 #    source repository URL.
818 #
819 #    The destination repository must already exist.
820 #
821 #    The user running this command must be authenticated
822 #    to both repositories.
823 #
824 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
825     # Identify the fetch and push URLs for the git repositories.
826
827     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
828     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
829
830     logger.debug('src_git_url: {}'.format(src_git_url))
831     logger.debug('dst_git_url: {}'.format(dst_git_url))
832
833     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
834
835     # Copy git commits from src repo to dst repo.
836     if src_git_repo not in local_repo_dir:
837         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
838         arvados.util.run_command(
839             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
840              local_repo_dir[src_git_repo]],
841             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
842             env={"HOME": os.environ["HOME"],
843                  "ARVADOS_API_TOKEN": src.api_token,
844                  "GIT_ASKPASS": "/bin/false"})
845         arvados.util.run_command(
846             ["git", "remote", "add", "dst", dst_git_url],
847             cwd=local_repo_dir[src_git_repo])
848     arvados.util.run_command(
849         ["git", "branch", dst_branch, script_version],
850         cwd=local_repo_dir[src_git_repo])
851     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
852                              cwd=local_repo_dir[src_git_repo],
853                              env={"HOME": os.environ["HOME"],
854                                   "ARVADOS_API_TOKEN": dst.api_token,
855                                   "GIT_ASKPASS": "/bin/false"})
856
857 def copy_docker_images(pipeline, src, dst, args):
858     """Copy any docker images named in the pipeline components'
859     runtime_constraints field from src to dst."""
860
861     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
862     for c_name, c_info in pipeline['components'].iteritems():
863         if ('runtime_constraints' in c_info and
864             'docker_image' in c_info['runtime_constraints']):
865             copy_docker_image(
866                 c_info['runtime_constraints']['docker_image'],
867                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
868                 src, dst, args)
869
870
871 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
872     """Copy the docker image identified by docker_image and
873     docker_image_tag from src to dst. Create appropriate
874     docker_image_repo+tag and docker_image_hash links at dst.
875
876     """
877
878     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
879
880     # Find the link identifying this docker image.
881     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
882         src, args.retries, docker_image, docker_image_tag)
883     if docker_image_list:
884         image_uuid, image_info = docker_image_list[0]
885         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
886
887         # Copy the collection it refers to.
888         dst_image_col = copy_collection(image_uuid, src, dst, args)
889     elif arvados.util.keep_locator_pattern.match(docker_image):
890         dst_image_col = copy_collection(docker_image, src, dst, args)
891     else:
892         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
893
894 # git_rev_parse(rev, repo)
895 #
896 #    Returns the 40-character commit hash corresponding to 'rev' in
897 #    git repository 'repo' (which must be the path of a local git
898 #    repository)
899 #
900 def git_rev_parse(rev, repo):
901     gitout, giterr = arvados.util.run_command(
902         ['git', 'rev-parse', rev], cwd=repo)
903     return gitout.strip()
904
905 # uuid_type(api, object_uuid)
906 #
907 #    Returns the name of the class that object_uuid belongs to, based on
908 #    the second field of the uuid.  This function consults the api's
909 #    schema to identify the object class.
910 #
911 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
912 #
913 #    Special case: if handed a Keep locator hash, return 'Collection'.
914 #
915 def uuid_type(api, object_uuid):
916     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
917         return 'Collection'
918     p = object_uuid.split('-')
919     if len(p) == 3:
920         type_prefix = p[1]
921         for k in api._schema.schemas:
922             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
923             if type_prefix == obj_class:
924                 return k
925     return None
926
927 def abort(msg, code=1):
928     logger.info("arv-copy: %s", msg)
929     exit(code)
930
931
932 # Code for reporting on the progress of a collection upload.
933 # Stolen from arvados.commands.put.ArvPutCollectionWriter
934 # TODO(twp): figure out how to refactor into a shared library
935 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
936 # code)
937
938 def machine_progress(obj_uuid, bytes_written, bytes_expected):
939     return "{} {}: {} {} written {} total\n".format(
940         sys.argv[0],
941         os.getpid(),
942         obj_uuid,
943         bytes_written,
944         -1 if (bytes_expected is None) else bytes_expected)
945
946 def human_progress(obj_uuid, bytes_written, bytes_expected):
947     if bytes_expected:
948         return "\r{}: {}M / {}M {:.1%} ".format(
949             obj_uuid,
950             bytes_written >> 20, bytes_expected >> 20,
951             float(bytes_written) / bytes_expected)
952     else:
953         return "\r{}: {} ".format(obj_uuid, bytes_written)
954
955 class ProgressWriter(object):
956     _progress_func = None
957     outfile = sys.stderr
958
959     def __init__(self, progress_func):
960         self._progress_func = progress_func
961
962     def report(self, obj_uuid, bytes_written, bytes_expected):
963         if self._progress_func is not None:
964             self.outfile.write(
965                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
966
967     def finish(self):
968         self.outfile.write("\n")
969
970 if __name__ == '__main__':
971     main()