11308: Futurize stage2.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 from __future__ import division
20 from future import standard_library
21 standard_library.install_aliases()
22 from past.builtins import basestring
23 from builtins import object
24 from past.utils import old_div
25 import argparse
26 import contextlib
27 import getpass
28 import os
29 import re
30 import shutil
31 import sys
32 import logging
33 import tempfile
34 import urllib.parse
35
36 import arvados
37 import arvados.config
38 import arvados.keep
39 import arvados.util
40 import arvados.commands._util as arv_cmd
41 import arvados.commands.keepdocker
42 import ruamel.yaml as yaml
43
44 from arvados.api import OrderedJsonModel
45 from arvados._version import __version__
46
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
48
49 logger = logging.getLogger('arvados.arv-copy')
50
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
53 # directories.
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
56 #
57 local_repo_dir = {}
58
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
62
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
65
66 # The owner_uuid of the object being copied
67 src_owner_uuid = None
68
69 def main():
70     copy_opts = argparse.ArgumentParser(add_help=False)
71
72     copy_opts.add_argument(
73         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74         help='Print version and exit.')
75     copy_opts.add_argument(
76         '-v', '--verbose', dest='verbose', action='store_true',
77         help='Verbose output.')
78     copy_opts.add_argument(
79         '--progress', dest='progress', action='store_true',
80         help='Report progress on copying collections. (default)')
81     copy_opts.add_argument(
82         '--no-progress', dest='progress', action='store_false',
83         help='Do not report progress on copying collections.')
84     copy_opts.add_argument(
85         '-f', '--force', dest='force', action='store_true',
86         help='Perform copy even if the object appears to exist at the remote destination.')
87     copy_opts.add_argument(
88         '--force-filters', action='store_true', default=False,
89         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
90     copy_opts.add_argument(
91         '--src', dest='source_arvados', required=True,
92         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados', required=True,
95         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
102     copy_opts.add_argument(
103         '--dst-git-repo', dest='dst_git_repo',
104         help='The name of the destination git repository. Required when copying a pipeline recursively.')
105     copy_opts.add_argument(
106         '--project-uuid', dest='project_uuid',
107         help='The UUID of the project at the destination to which the pipeline should be copied.')
108     copy_opts.add_argument(
109         '--allow-git-http-src', action="store_true",
110         help='Allow cloning git repositories over insecure http')
111     copy_opts.add_argument(
112         '--allow-git-http-dst', action="store_true",
113         help='Allow pushing git repositories over insecure http')
114
115     copy_opts.add_argument(
116         'object_uuid',
117         help='The UUID of the object to be copied.')
118     copy_opts.set_defaults(progress=True)
119     copy_opts.set_defaults(recursive=True)
120
121     parser = argparse.ArgumentParser(
122         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
123         parents=[copy_opts, arv_cmd.retry_opt])
124     args = parser.parse_args()
125
126     if args.verbose:
127         logger.setLevel(logging.DEBUG)
128     else:
129         logger.setLevel(logging.INFO)
130
131     # Create API clients for the source and destination instances
132     src_arv = api_for_instance(args.source_arvados)
133     dst_arv = api_for_instance(args.destination_arvados)
134
135     if not args.project_uuid:
136         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137
138     # Identify the kind of object we have been given, and begin copying.
139     t = uuid_type(src_arv, args.object_uuid)
140     if t == 'Collection':
141         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142         result = copy_collection(args.object_uuid,
143                                  src_arv, dst_arv,
144                                  args)
145     elif t == 'PipelineInstance':
146         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
147         result = copy_pipeline_instance(args.object_uuid,
148                                         src_arv, dst_arv,
149                                         args)
150     elif t == 'PipelineTemplate':
151         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
152         result = copy_pipeline_template(args.object_uuid,
153                                         src_arv, dst_arv, args)
154     elif t == 'Workflow':
155         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
156         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
157     else:
158         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
159
160     # Clean up any outstanding temp git repositories.
161     for d in list(local_repo_dir.values()):
162         shutil.rmtree(d, ignore_errors=True)
163
164     # If no exception was thrown and the response does not have an
165     # error_token field, presume success
166     if 'error_token' in result or 'uuid' not in result:
167         logger.error("API server returned an error result: {}".format(result))
168         exit(1)
169
170     logger.info("")
171     logger.info("Success: created copy with uuid {}".format(result['uuid']))
172     exit(0)
173
174 def set_src_owner_uuid(resource, uuid, args):
175     global src_owner_uuid
176     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
177     src_owner_uuid = c.get("owner_uuid")
178
179 # api_for_instance(instance_name)
180 #
181 #     Creates an API client for the Arvados instance identified by
182 #     instance_name.
183 #
184 #     If instance_name contains a slash, it is presumed to be a path
185 #     (either local or absolute) to a file with Arvados configuration
186 #     settings.
187 #
188 #     Otherwise, it is presumed to be the name of a file in
189 #     $HOME/.config/arvados/instance_name.conf
190 #
191 def api_for_instance(instance_name):
192     if '/' in instance_name:
193         config_file = instance_name
194     else:
195         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
196
197     try:
198         cfg = arvados.config.load(config_file)
199     except (IOError, OSError) as e:
200         abort(("Could not open config file {}: {}\n" +
201                "You must make sure that your configuration tokens\n" +
202                "for Arvados instance {} are in {} and that this\n" +
203                "file is readable.").format(
204                    config_file, e, instance_name, config_file))
205
206     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
207         api_is_insecure = (
208             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
209                 ['1', 't', 'true', 'y', 'yes']))
210         client = arvados.api('v1',
211                              host=cfg['ARVADOS_API_HOST'],
212                              token=cfg['ARVADOS_API_TOKEN'],
213                              insecure=api_is_insecure,
214                              model=OrderedJsonModel())
215     else:
216         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
217     return client
218
219 # Check if git is available
220 def check_git_availability():
221     try:
222         arvados.util.run_command(['git', '--help'])
223     except Exception:
224         abort('git command is not available. Please ensure git is installed.')
225
226 # copy_pipeline_instance(pi_uuid, src, dst, args)
227 #
228 #    Copies a pipeline instance identified by pi_uuid from src to dst.
229 #
230 #    If the args.recursive option is set:
231 #      1. Copies all input collections
232 #           * For each component in the pipeline, include all collections
233 #             listed as job dependencies for that component)
234 #      2. Copy docker images
235 #      3. Copy git repositories
236 #      4. Copy the pipeline template
237 #
238 #    The only changes made to the copied pipeline instance are:
239 #      1. The original pipeline instance UUID is preserved in
240 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
241 #      2. The pipeline_template_uuid is changed to the new template uuid.
242 #      3. The owner_uuid of the instance is changed to the user who
243 #         copied it.
244 #
245 def copy_pipeline_instance(pi_uuid, src, dst, args):
246     # Fetch the pipeline instance record.
247     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
248
249     if args.recursive:
250         check_git_availability()
251
252         if not args.dst_git_repo:
253             abort('--dst-git-repo is required when copying a pipeline recursively.')
254         # Copy the pipeline template and save the copied template.
255         if pi.get('pipeline_template_uuid', None):
256             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
257                                         src, dst, args)
258
259         # Copy input collections, docker images and git repos.
260         pi = copy_collections(pi, src, dst, args)
261         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
262         copy_docker_images(pi, src, dst, args)
263
264         # Update the fields of the pipeline instance with the copied
265         # pipeline template.
266         if pi.get('pipeline_template_uuid', None):
267             pi['pipeline_template_uuid'] = pt['uuid']
268
269     else:
270         # not recursive
271         logger.info("Copying only pipeline instance %s.", pi_uuid)
272         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
273
274     # Update the pipeline instance properties, and create the new
275     # instance at dst.
276     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
277     pi['description'] = "Pipeline copied from {}\n\n{}".format(
278         pi_uuid,
279         pi['description'] if pi.get('description', None) else '')
280
281     pi['owner_uuid'] = args.project_uuid
282
283     del pi['uuid']
284
285     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
286     return new_pi
287
288 def filter_iter(arg):
289     """Iterate a filter string-or-list.
290
291     Pass in a filter field that can either be a string or list.
292     This will iterate elements as if the field had been written as a list.
293     """
294     if isinstance(arg, basestring):
295         return iter((arg,))
296     else:
297         return iter(arg)
298
299 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
300     """Update a single repository filter in-place for the destination.
301
302     If the filter checks that the repository is src_repository, it is
303     updated to check that the repository is dst_repository.  If it does
304     anything else, this function raises ValueError.
305     """
306     if src_repository is None:
307         raise ValueError("component does not specify a source repository")
308     elif dst_repository is None:
309         raise ValueError("no destination repository specified to update repository filter")
310     elif repo_filter[1:] == ['=', src_repository]:
311         repo_filter[2] = dst_repository
312     elif repo_filter[1:] == ['in', [src_repository]]:
313         repo_filter[2] = [dst_repository]
314     else:
315         raise ValueError("repository filter is not a simple source match")
316
317 def migrate_script_version_filter(version_filter):
318     """Update a single script_version filter in-place for the destination.
319
320     Currently this function checks that all the filter operands are Git
321     commit hashes.  If they're not, it raises ValueError to indicate that
322     the filter is not portable.  It could be extended to make other
323     transformations in the future.
324     """
325     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
326         raise ValueError("script_version filter is not limited to commit hashes")
327
328 def attr_filtered(filter_, *attr_names):
329     """Return True if filter_ applies to any of attr_names, else False."""
330     return any((name == 'any') or (name in attr_names)
331                for name in filter_iter(filter_[0]))
332
333 @contextlib.contextmanager
334 def exception_handler(handler, *exc_types):
335     """If any exc_types are raised in the block, call handler on the exception."""
336     try:
337         yield
338     except exc_types as error:
339         handler(error)
340
341 def migrate_components_filters(template_components, dst_git_repo):
342     """Update template component filters in-place for the destination.
343
344     template_components is a dictionary of components in a pipeline template.
345     This method walks over each component's filters, and updates them to have
346     identical semantics on the destination cluster.  It returns a list of
347     error strings that describe what filters could not be updated safely.
348
349     dst_git_repo is the name of the destination Git repository, which can
350     be None if that is not known.
351     """
352     errors = []
353     for cname, cspec in template_components.items():
354         def add_error(errmsg):
355             errors.append("{}: {}".format(cname, errmsg))
356         if not isinstance(cspec, dict):
357             add_error("value is not a component definition")
358             continue
359         src_repository = cspec.get('repository')
360         filters = cspec.get('filters', [])
361         if not isinstance(filters, list):
362             add_error("filters are not a list")
363             continue
364         for cfilter in filters:
365             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
366                 add_error("malformed filter {!r}".format(cfilter))
367                 continue
368             if attr_filtered(cfilter, 'repository'):
369                 with exception_handler(add_error, ValueError):
370                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
371             if attr_filtered(cfilter, 'script_version'):
372                 with exception_handler(add_error, ValueError):
373                     migrate_script_version_filter(cfilter)
374     return errors
375
376 # copy_pipeline_template(pt_uuid, src, dst, args)
377 #
378 #    Copies a pipeline template identified by pt_uuid from src to dst.
379 #
380 #    If args.recursive is True, also copy any collections, docker
381 #    images and git repositories that this template references.
382 #
383 #    The owner_uuid of the new template is changed to that of the user
384 #    who copied the template.
385 #
386 #    Returns the copied pipeline template object.
387 #
388 def copy_pipeline_template(pt_uuid, src, dst, args):
389     # fetch the pipeline template from the source instance
390     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
391
392     if not args.force_filters:
393         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
394         if filter_errors:
395             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
396                   "\n".join(filter_errors))
397
398     if args.recursive:
399         check_git_availability()
400
401         if not args.dst_git_repo:
402             abort('--dst-git-repo is required when copying a pipeline recursively.')
403         # Copy input collections, docker images and git repos.
404         pt = copy_collections(pt, src, dst, args)
405         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
406         copy_docker_images(pt, src, dst, args)
407
408     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
409         pt_uuid,
410         pt['description'] if pt.get('description', None) else '')
411     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
412     del pt['uuid']
413
414     pt['owner_uuid'] = args.project_uuid
415
416     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
417
418 # copy_workflow(wf_uuid, src, dst, args)
419 #
420 #    Copies a workflow identified by wf_uuid from src to dst.
421 #
422 #    If args.recursive is True, also copy any collections
423 #      referenced in the workflow definition yaml.
424 #
425 #    The owner_uuid of the new workflow is set to any given
426 #      project_uuid or the user who copied the template.
427 #
428 #    Returns the copied workflow object.
429 #
430 def copy_workflow(wf_uuid, src, dst, args):
431     # fetch the workflow from the source instance
432     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
433
434     # copy collections and docker images
435     if args.recursive:
436         wf_def = yaml.safe_load(wf["definition"])
437         if wf_def is not None:
438             locations = []
439             docker_images = {}
440             graph = wf_def.get('$graph', None)
441             if graph is not None:
442                 workflow_collections(graph, locations, docker_images)
443             else:
444                 workflow_collections(wf_def, locations, docker_images)
445
446             if locations:
447                 copy_collections(locations, src, dst, args)
448
449             for image in docker_images:
450                 copy_docker_image(image, docker_images[image], src, dst, args)
451
452     # copy the workflow itself
453     del wf['uuid']
454     wf['owner_uuid'] = args.project_uuid
455     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
456
457 def workflow_collections(obj, locations, docker_images):
458     if isinstance(obj, dict):
459         loc = obj.get('location', None)
460         if loc is not None:
461             if loc.startswith("keep:"):
462                 locations.append(loc[5:])
463
464         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
465         if docker_image is not None:
466             ds = docker_image.split(":", 1)
467             tag = ds[1] if len(ds)==2 else 'latest'
468             docker_images[ds[0]] = tag
469
470         for x in obj:
471             workflow_collections(obj[x], locations, docker_images)
472     elif isinstance(obj, list):
473         for x in obj:
474             workflow_collections(x, locations, docker_images)
475
476 # copy_collections(obj, src, dst, args)
477 #
478 #    Recursively copies all collections referenced by 'obj' from src
479 #    to dst.  obj may be a dict or a list, in which case we run
480 #    copy_collections on every value it contains. If it is a string,
481 #    search it for any substring that matches a collection hash or uuid
482 #    (this will find hidden references to collections like
483 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
484 #
485 #    Returns a copy of obj with any old collection uuids replaced by
486 #    the new ones.
487 #
488 def copy_collections(obj, src, dst, args):
489
490     def copy_collection_fn(collection_match):
491         """Helper function for regex substitution: copies a single collection,
492         identified by the collection_match MatchObject, to the
493         destination.  Returns the destination collection uuid (or the
494         portable data hash if that's what src_id is).
495
496         """
497         src_id = collection_match.group(0)
498         if src_id not in collections_copied:
499             dst_col = copy_collection(src_id, src, dst, args)
500             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
501                 collections_copied[src_id] = src_id
502             else:
503                 collections_copied[src_id] = dst_col['uuid']
504         return collections_copied[src_id]
505
506     if isinstance(obj, basestring):
507         # Copy any collections identified in this string to dst, replacing
508         # them with the dst uuids as necessary.
509         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
510         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
511         return obj
512     elif isinstance(obj, dict):
513         return type(obj)((v, copy_collections(obj[v], src, dst, args))
514                          for v in obj)
515     elif isinstance(obj, list):
516         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
517     return obj
518
519 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
520     """Copy a job's script to the destination repository, and update its record.
521
522     Given a jobspec dictionary, this function finds the referenced script from
523     src and copies it to dst and dst_repo.  It also updates jobspec in place to
524     refer to names on the destination.
525     """
526     repo = jobspec.get('repository')
527     if repo is None:
528         return
529     # script_version is the "script_version" parameter from the source
530     # component or job.  If no script_version was supplied in the
531     # component or job, it is a mistake in the pipeline, but for the
532     # purposes of copying the repository, default to "master".
533     script_version = jobspec.get('script_version') or 'master'
534     script_key = (repo, script_version)
535     if script_key not in scripts_copied:
536         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
537         scripts_copied.add(script_key)
538     jobspec['repository'] = dst_repo
539     repo_dir = local_repo_dir[repo]
540     for version_key in ['script_version', 'supplied_script_version']:
541         if version_key in jobspec:
542             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
543
544 # copy_git_repos(p, src, dst, dst_repo, args)
545 #
546 #    Copies all git repositories referenced by pipeline instance or
547 #    template 'p' from src to dst.
548 #
549 #    For each component c in the pipeline:
550 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
551 #      * Rename script versions:
552 #          * c['script_version']
553 #          * c['job']['script_version']
554 #          * c['job']['supplied_script_version']
555 #        to the commit hashes they resolve to, since any symbolic
556 #        names (tags, branches) are not preserved in the destination repo.
557 #
558 #    The pipeline object is updated in place with the new repository
559 #    names.  The return value is undefined.
560 #
561 def copy_git_repos(p, src, dst, dst_repo, args):
562     for component in p['components'].values():
563         migrate_jobspec(component, src, dst, dst_repo, args)
564         if 'job' in component:
565             migrate_jobspec(component['job'], src, dst, dst_repo, args)
566
567 def total_collection_size(manifest_text):
568     """Return the total number of bytes in this collection (excluding
569     duplicate blocks)."""
570
571     total_bytes = 0
572     locators_seen = {}
573     for line in manifest_text.splitlines():
574         words = line.split()
575         for word in words[1:]:
576             try:
577                 loc = arvados.KeepLocator(word)
578             except ValueError:
579                 continue  # this word isn't a locator, skip it
580             if loc.md5sum not in locators_seen:
581                 locators_seen[loc.md5sum] = True
582                 total_bytes += loc.size
583
584     return total_bytes
585
586 def create_collection_from(c, src, dst, args):
587     """Create a new collection record on dst, and copy Docker metadata if
588     available."""
589
590     collection_uuid = c['uuid']
591     del c['uuid']
592
593     if not c["name"]:
594         c['name'] = "copied from " + collection_uuid
595
596     if 'properties' in c:
597         del c['properties']
598
599     c['owner_uuid'] = args.project_uuid
600
601     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
602
603     # Create docker_image_repo+tag and docker_image_hash links
604     # at the destination.
605     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
606         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
607
608         for src_link in docker_links:
609             body = {key: src_link[key]
610                     for key in ['link_class', 'name', 'properties']}
611             body['head_uuid'] = dst_collection['uuid']
612             body['owner_uuid'] = args.project_uuid
613
614             lk = dst.links().create(body=body).execute(num_retries=args.retries)
615             logger.debug('created dst link {}'.format(lk))
616
617     return dst_collection
618
619 # copy_collection(obj_uuid, src, dst, args)
620 #
621 #    Copies the collection identified by obj_uuid from src to dst.
622 #    Returns the collection object created at dst.
623 #
624 #    If args.progress is True, produce a human-friendly progress
625 #    report.
626 #
627 #    If a collection with the desired portable_data_hash already
628 #    exists at dst, and args.force is False, copy_collection returns
629 #    the existing collection without copying any blocks.  Otherwise
630 #    (if no collection exists or if args.force is True)
631 #    copy_collection copies all of the collection data blocks from src
632 #    to dst.
633 #
634 #    For this application, it is critical to preserve the
635 #    collection's manifest hash, which is not guaranteed with the
636 #    arvados.CollectionReader and arvados.CollectionWriter classes.
637 #    Copying each block in the collection manually, followed by
638 #    the manifest block, ensures that the collection's manifest
639 #    hash will not change.
640 #
641 def copy_collection(obj_uuid, src, dst, args):
642     if arvados.util.keep_locator_pattern.match(obj_uuid):
643         # If the obj_uuid is a portable data hash, it might not be uniquely
644         # identified with a particular collection.  As a result, it is
645         # ambigious as to what name to use for the copy.  Apply some heuristics
646         # to pick which collection to get the name from.
647         srccol = src.collections().list(
648             filters=[['portable_data_hash', '=', obj_uuid]],
649             order="created_at asc"
650             ).execute(num_retries=args.retries)
651
652         items = srccol.get("items")
653
654         if not items:
655             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
656             return
657
658         c = None
659
660         if len(items) == 1:
661             # There's only one collection with the PDH, so use that.
662             c = items[0]
663         if not c:
664             # See if there is a collection that's in the same project
665             # as the root item (usually a pipeline) being copied.
666             for i in items:
667                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
668                     c = i
669                     break
670         if not c:
671             # Didn't find any collections located in the same project, so
672             # pick the oldest collection that has a name assigned to it.
673             for i in items:
674                 if i.get("name"):
675                     c = i
676                     break
677         if not c:
678             # None of the collections have names (?!), so just pick the
679             # first one.
680             c = items[0]
681
682         # list() doesn't return manifest text (and we don't want it to,
683         # because we don't need the same maninfest text sent to us 50
684         # times) so go and retrieve the collection object directly
685         # which will include the manifest text.
686         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
687     else:
688         # Assume this is an actual collection uuid, so fetch it directly.
689         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
690
691     # If a collection with this hash already exists at the
692     # destination, and 'force' is not true, just return that
693     # collection.
694     if not args.force:
695         if 'portable_data_hash' in c:
696             colhash = c['portable_data_hash']
697         else:
698             colhash = c['uuid']
699         dstcol = dst.collections().list(
700             filters=[['portable_data_hash', '=', colhash]]
701         ).execute(num_retries=args.retries)
702         if dstcol['items_available'] > 0:
703             for d in dstcol['items']:
704                 if ((args.project_uuid == d['owner_uuid']) and
705                     (c.get('name') == d['name']) and
706                     (c['portable_data_hash'] == d['portable_data_hash'])):
707                     return d
708             c['manifest_text'] = dst.collections().get(
709                 uuid=dstcol['items'][0]['uuid']
710             ).execute(num_retries=args.retries)['manifest_text']
711             return create_collection_from(c, src, dst, args)
712
713     # Fetch the collection's manifest.
714     manifest = c['manifest_text']
715     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
716
717     # Copy each block from src_keep to dst_keep.
718     # Use the newly signed locators returned from dst_keep to build
719     # a new manifest as we go.
720     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
721     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
722     dst_manifest = ""
723     dst_locators = {}
724     bytes_written = 0
725     bytes_expected = total_collection_size(manifest)
726     if args.progress:
727         progress_writer = ProgressWriter(human_progress)
728     else:
729         progress_writer = None
730
731     for line in manifest.splitlines():
732         words = line.split()
733         dst_manifest += words[0]
734         for word in words[1:]:
735             try:
736                 loc = arvados.KeepLocator(word)
737             except ValueError:
738                 # If 'word' can't be parsed as a locator,
739                 # presume it's a filename.
740                 dst_manifest += ' ' + word
741                 continue
742             blockhash = loc.md5sum
743             # copy this block if we haven't seen it before
744             # (otherwise, just reuse the existing dst_locator)
745             if blockhash not in dst_locators:
746                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
747                 if progress_writer:
748                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
749                 data = src_keep.get(word)
750                 dst_locator = dst_keep.put(data)
751                 dst_locators[blockhash] = dst_locator
752                 bytes_written += loc.size
753             dst_manifest += ' ' + dst_locators[blockhash]
754         dst_manifest += "\n"
755
756     if progress_writer:
757         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
758         progress_writer.finish()
759
760     # Copy the manifest and save the collection.
761     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
762
763     c['manifest_text'] = dst_manifest
764     return create_collection_from(c, src, dst, args)
765
766 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
767     r = api.repositories().list(
768         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
769     if r['items_available'] != 1:
770         raise Exception('cannot identify repo {}; {} repos found'
771                         .format(repo_name, r['items_available']))
772
773     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
774     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
775     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
776
777     priority = https_url + other_url + http_url
778
779     git_config = []
780     git_url = None
781     for url in priority:
782         if url.startswith("http"):
783             u = urllib.parse.urlsplit(url)
784             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
785             git_config = ["-c", "credential.%s/.username=none" % baseurl,
786                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
787         else:
788             git_config = []
789
790         try:
791             logger.debug("trying %s", url)
792             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
793                                       env={"HOME": os.environ["HOME"],
794                                            "ARVADOS_API_TOKEN": api.api_token,
795                                            "GIT_ASKPASS": "/bin/false"})
796         except arvados.errors.CommandFailedError:
797             pass
798         else:
799             git_url = url
800             break
801
802     if not git_url:
803         raise Exception('Cannot access git repository, tried {}'
804                         .format(priority))
805
806     if git_url.startswith("http:"):
807         if allow_insecure_http:
808             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
809         else:
810             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
811
812     return (git_url, git_config)
813
814
815 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
816 #
817 #    Copies commits from git repository 'src_git_repo' on Arvados
818 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
819 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
820 #    or "jsmith")
821 #
822 #    All commits will be copied to a destination branch named for the
823 #    source repository URL.
824 #
825 #    The destination repository must already exist.
826 #
827 #    The user running this command must be authenticated
828 #    to both repositories.
829 #
830 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
831     # Identify the fetch and push URLs for the git repositories.
832
833     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
834     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
835
836     logger.debug('src_git_url: {}'.format(src_git_url))
837     logger.debug('dst_git_url: {}'.format(dst_git_url))
838
839     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
840
841     # Copy git commits from src repo to dst repo.
842     if src_git_repo not in local_repo_dir:
843         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
844         arvados.util.run_command(
845             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
846              local_repo_dir[src_git_repo]],
847             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
848             env={"HOME": os.environ["HOME"],
849                  "ARVADOS_API_TOKEN": src.api_token,
850                  "GIT_ASKPASS": "/bin/false"})
851         arvados.util.run_command(
852             ["git", "remote", "add", "dst", dst_git_url],
853             cwd=local_repo_dir[src_git_repo])
854     arvados.util.run_command(
855         ["git", "branch", dst_branch, script_version],
856         cwd=local_repo_dir[src_git_repo])
857     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
858                              cwd=local_repo_dir[src_git_repo],
859                              env={"HOME": os.environ["HOME"],
860                                   "ARVADOS_API_TOKEN": dst.api_token,
861                                   "GIT_ASKPASS": "/bin/false"})
862
863 def copy_docker_images(pipeline, src, dst, args):
864     """Copy any docker images named in the pipeline components'
865     runtime_constraints field from src to dst."""
866
867     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
868     for c_name, c_info in pipeline['components'].items():
869         if ('runtime_constraints' in c_info and
870             'docker_image' in c_info['runtime_constraints']):
871             copy_docker_image(
872                 c_info['runtime_constraints']['docker_image'],
873                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
874                 src, dst, args)
875
876
877 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
878     """Copy the docker image identified by docker_image and
879     docker_image_tag from src to dst. Create appropriate
880     docker_image_repo+tag and docker_image_hash links at dst.
881
882     """
883
884     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
885
886     # Find the link identifying this docker image.
887     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
888         src, args.retries, docker_image, docker_image_tag)
889     if docker_image_list:
890         image_uuid, image_info = docker_image_list[0]
891         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
892
893         # Copy the collection it refers to.
894         dst_image_col = copy_collection(image_uuid, src, dst, args)
895     elif arvados.util.keep_locator_pattern.match(docker_image):
896         dst_image_col = copy_collection(docker_image, src, dst, args)
897     else:
898         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
899
900 # git_rev_parse(rev, repo)
901 #
902 #    Returns the 40-character commit hash corresponding to 'rev' in
903 #    git repository 'repo' (which must be the path of a local git
904 #    repository)
905 #
906 def git_rev_parse(rev, repo):
907     gitout, giterr = arvados.util.run_command(
908         ['git', 'rev-parse', rev], cwd=repo)
909     return gitout.strip()
910
911 # uuid_type(api, object_uuid)
912 #
913 #    Returns the name of the class that object_uuid belongs to, based on
914 #    the second field of the uuid.  This function consults the api's
915 #    schema to identify the object class.
916 #
917 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
918 #
919 #    Special case: if handed a Keep locator hash, return 'Collection'.
920 #
921 def uuid_type(api, object_uuid):
922     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
923         return 'Collection'
924     p = object_uuid.split('-')
925     if len(p) == 3:
926         type_prefix = p[1]
927         for k in api._schema.schemas:
928             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
929             if type_prefix == obj_class:
930                 return k
931     return None
932
933 def abort(msg, code=1):
934     logger.info("arv-copy: %s", msg)
935     exit(code)
936
937
938 # Code for reporting on the progress of a collection upload.
939 # Stolen from arvados.commands.put.ArvPutCollectionWriter
940 # TODO(twp): figure out how to refactor into a shared library
941 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
942 # code)
943
944 def machine_progress(obj_uuid, bytes_written, bytes_expected):
945     return "{} {}: {} {} written {} total\n".format(
946         sys.argv[0],
947         os.getpid(),
948         obj_uuid,
949         bytes_written,
950         -1 if (bytes_expected is None) else bytes_expected)
951
952 def human_progress(obj_uuid, bytes_written, bytes_expected):
953     if bytes_expected:
954         return "\r{}: {}M / {}M {:.1%} ".format(
955             obj_uuid,
956             bytes_written >> 20, bytes_expected >> 20,
957             old_div(float(bytes_written), bytes_expected))
958     else:
959         return "\r{}: {} ".format(obj_uuid, bytes_written)
960
961 class ProgressWriter(object):
962     _progress_func = None
963     outfile = sys.stderr
964
965     def __init__(self, progress_func):
966         self._progress_func = progress_func
967
968     def report(self, obj_uuid, bytes_written, bytes_expected):
969         if self._progress_func is not None:
970             self.outfile.write(
971                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
972
973     def finish(self):
974         self.outfile.write("\n")
975
976 if __name__ == '__main__':
977     main()