Merge branch '12568-cwl-collection-cache' closes #12568
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a pipeline instance, arv-copy copies the pipeline
12 # template, input collection, docker images, git repositories). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
55 # directories.
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 #
59 local_repo_dir = {}
60
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
64
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
67
68 # The owner_uuid of the object being copied
69 src_owner_uuid = None
70
71 def main():
72     copy_opts = argparse.ArgumentParser(add_help=False)
73
74     copy_opts.add_argument(
75         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76         help='Print version and exit.')
77     copy_opts.add_argument(
78         '-v', '--verbose', dest='verbose', action='store_true',
79         help='Verbose output.')
80     copy_opts.add_argument(
81         '--progress', dest='progress', action='store_true',
82         help='Report progress on copying collections. (default)')
83     copy_opts.add_argument(
84         '--no-progress', dest='progress', action='store_false',
85         help='Do not report progress on copying collections.')
86     copy_opts.add_argument(
87         '-f', '--force', dest='force', action='store_true',
88         help='Perform copy even if the object appears to exist at the remote destination.')
89     copy_opts.add_argument(
90         '--force-filters', action='store_true', default=False,
91         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
92     copy_opts.add_argument(
93         '--src', dest='source_arvados', required=True,
94         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--dst', dest='destination_arvados', required=True,
97         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
98     copy_opts.add_argument(
99         '--recursive', dest='recursive', action='store_true',
100         help='Recursively copy any dependencies for this object. (default)')
101     copy_opts.add_argument(
102         '--no-recursive', dest='recursive', action='store_false',
103         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
104     copy_opts.add_argument(
105         '--dst-git-repo', dest='dst_git_repo',
106         help='The name of the destination git repository. Required when copying a pipeline recursively.')
107     copy_opts.add_argument(
108         '--project-uuid', dest='project_uuid',
109         help='The UUID of the project at the destination to which the pipeline should be copied.')
110     copy_opts.add_argument(
111         '--allow-git-http-src', action="store_true",
112         help='Allow cloning git repositories over insecure http')
113     copy_opts.add_argument(
114         '--allow-git-http-dst', action="store_true",
115         help='Allow pushing git repositories over insecure http')
116
117     copy_opts.add_argument(
118         'object_uuid',
119         help='The UUID of the object to be copied.')
120     copy_opts.set_defaults(progress=True)
121     copy_opts.set_defaults(recursive=True)
122
123     parser = argparse.ArgumentParser(
124         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
125         parents=[copy_opts, arv_cmd.retry_opt])
126     args = parser.parse_args()
127
128     if args.verbose:
129         logger.setLevel(logging.DEBUG)
130     else:
131         logger.setLevel(logging.INFO)
132
133     # Create API clients for the source and destination instances
134     src_arv = api_for_instance(args.source_arvados)
135     dst_arv = api_for_instance(args.destination_arvados)
136
137     if not args.project_uuid:
138         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
139
140     # Identify the kind of object we have been given, and begin copying.
141     t = uuid_type(src_arv, args.object_uuid)
142     if t == 'Collection':
143         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
144         result = copy_collection(args.object_uuid,
145                                  src_arv, dst_arv,
146                                  args)
147     elif t == 'PipelineInstance':
148         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
149         result = copy_pipeline_instance(args.object_uuid,
150                                         src_arv, dst_arv,
151                                         args)
152     elif t == 'PipelineTemplate':
153         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
154         result = copy_pipeline_template(args.object_uuid,
155                                         src_arv, dst_arv, args)
156     elif t == 'Workflow':
157         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
159     else:
160         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
161
162     # Clean up any outstanding temp git repositories.
163     for d in listvalues(local_repo_dir):
164         shutil.rmtree(d, ignore_errors=True)
165
166     # If no exception was thrown and the response does not have an
167     # error_token field, presume success
168     if 'error_token' in result or 'uuid' not in result:
169         logger.error("API server returned an error result: {}".format(result))
170         exit(1)
171
172     logger.info("")
173     logger.info("Success: created copy with uuid {}".format(result['uuid']))
174     exit(0)
175
176 def set_src_owner_uuid(resource, uuid, args):
177     global src_owner_uuid
178     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
179     src_owner_uuid = c.get("owner_uuid")
180
181 # api_for_instance(instance_name)
182 #
183 #     Creates an API client for the Arvados instance identified by
184 #     instance_name.
185 #
186 #     If instance_name contains a slash, it is presumed to be a path
187 #     (either local or absolute) to a file with Arvados configuration
188 #     settings.
189 #
190 #     Otherwise, it is presumed to be the name of a file in
191 #     $HOME/.config/arvados/instance_name.conf
192 #
193 def api_for_instance(instance_name):
194     if '/' in instance_name:
195         config_file = instance_name
196     else:
197         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
198
199     try:
200         cfg = arvados.config.load(config_file)
201     except (IOError, OSError) as e:
202         abort(("Could not open config file {}: {}\n" +
203                "You must make sure that your configuration tokens\n" +
204                "for Arvados instance {} are in {} and that this\n" +
205                "file is readable.").format(
206                    config_file, e, instance_name, config_file))
207
208     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
209         api_is_insecure = (
210             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
211                 ['1', 't', 'true', 'y', 'yes']))
212         client = arvados.api('v1',
213                              host=cfg['ARVADOS_API_HOST'],
214                              token=cfg['ARVADOS_API_TOKEN'],
215                              insecure=api_is_insecure,
216                              model=OrderedJsonModel())
217     else:
218         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
219     return client
220
221 # Check if git is available
222 def check_git_availability():
223     try:
224         arvados.util.run_command(['git', '--help'])
225     except Exception:
226         abort('git command is not available. Please ensure git is installed.')
227
228 # copy_pipeline_instance(pi_uuid, src, dst, args)
229 #
230 #    Copies a pipeline instance identified by pi_uuid from src to dst.
231 #
232 #    If the args.recursive option is set:
233 #      1. Copies all input collections
234 #           * For each component in the pipeline, include all collections
235 #             listed as job dependencies for that component)
236 #      2. Copy docker images
237 #      3. Copy git repositories
238 #      4. Copy the pipeline template
239 #
240 #    The only changes made to the copied pipeline instance are:
241 #      1. The original pipeline instance UUID is preserved in
242 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
243 #      2. The pipeline_template_uuid is changed to the new template uuid.
244 #      3. The owner_uuid of the instance is changed to the user who
245 #         copied it.
246 #
247 def copy_pipeline_instance(pi_uuid, src, dst, args):
248     # Fetch the pipeline instance record.
249     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
250
251     if args.recursive:
252         check_git_availability()
253
254         if not args.dst_git_repo:
255             abort('--dst-git-repo is required when copying a pipeline recursively.')
256         # Copy the pipeline template and save the copied template.
257         if pi.get('pipeline_template_uuid', None):
258             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
259                                         src, dst, args)
260
261         # Copy input collections, docker images and git repos.
262         pi = copy_collections(pi, src, dst, args)
263         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
264         copy_docker_images(pi, src, dst, args)
265
266         # Update the fields of the pipeline instance with the copied
267         # pipeline template.
268         if pi.get('pipeline_template_uuid', None):
269             pi['pipeline_template_uuid'] = pt['uuid']
270
271     else:
272         # not recursive
273         logger.info("Copying only pipeline instance %s.", pi_uuid)
274         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
275
276     # Update the pipeline instance properties, and create the new
277     # instance at dst.
278     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
279     pi['description'] = "Pipeline copied from {}\n\n{}".format(
280         pi_uuid,
281         pi['description'] if pi.get('description', None) else '')
282
283     pi['owner_uuid'] = args.project_uuid
284
285     del pi['uuid']
286
287     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
288     return new_pi
289
290 def filter_iter(arg):
291     """Iterate a filter string-or-list.
292
293     Pass in a filter field that can either be a string or list.
294     This will iterate elements as if the field had been written as a list.
295     """
296     if isinstance(arg, basestring):
297         return iter((arg,))
298     else:
299         return iter(arg)
300
301 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
302     """Update a single repository filter in-place for the destination.
303
304     If the filter checks that the repository is src_repository, it is
305     updated to check that the repository is dst_repository.  If it does
306     anything else, this function raises ValueError.
307     """
308     if src_repository is None:
309         raise ValueError("component does not specify a source repository")
310     elif dst_repository is None:
311         raise ValueError("no destination repository specified to update repository filter")
312     elif repo_filter[1:] == ['=', src_repository]:
313         repo_filter[2] = dst_repository
314     elif repo_filter[1:] == ['in', [src_repository]]:
315         repo_filter[2] = [dst_repository]
316     else:
317         raise ValueError("repository filter is not a simple source match")
318
319 def migrate_script_version_filter(version_filter):
320     """Update a single script_version filter in-place for the destination.
321
322     Currently this function checks that all the filter operands are Git
323     commit hashes.  If they're not, it raises ValueError to indicate that
324     the filter is not portable.  It could be extended to make other
325     transformations in the future.
326     """
327     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
328         raise ValueError("script_version filter is not limited to commit hashes")
329
330 def attr_filtered(filter_, *attr_names):
331     """Return True if filter_ applies to any of attr_names, else False."""
332     return any((name == 'any') or (name in attr_names)
333                for name in filter_iter(filter_[0]))
334
335 @contextlib.contextmanager
336 def exception_handler(handler, *exc_types):
337     """If any exc_types are raised in the block, call handler on the exception."""
338     try:
339         yield
340     except exc_types as error:
341         handler(error)
342
343 def migrate_components_filters(template_components, dst_git_repo):
344     """Update template component filters in-place for the destination.
345
346     template_components is a dictionary of components in a pipeline template.
347     This method walks over each component's filters, and updates them to have
348     identical semantics on the destination cluster.  It returns a list of
349     error strings that describe what filters could not be updated safely.
350
351     dst_git_repo is the name of the destination Git repository, which can
352     be None if that is not known.
353     """
354     errors = []
355     for cname, cspec in template_components.items():
356         def add_error(errmsg):
357             errors.append("{}: {}".format(cname, errmsg))
358         if not isinstance(cspec, dict):
359             add_error("value is not a component definition")
360             continue
361         src_repository = cspec.get('repository')
362         filters = cspec.get('filters', [])
363         if not isinstance(filters, list):
364             add_error("filters are not a list")
365             continue
366         for cfilter in filters:
367             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
368                 add_error("malformed filter {!r}".format(cfilter))
369                 continue
370             if attr_filtered(cfilter, 'repository'):
371                 with exception_handler(add_error, ValueError):
372                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
373             if attr_filtered(cfilter, 'script_version'):
374                 with exception_handler(add_error, ValueError):
375                     migrate_script_version_filter(cfilter)
376     return errors
377
378 # copy_pipeline_template(pt_uuid, src, dst, args)
379 #
380 #    Copies a pipeline template identified by pt_uuid from src to dst.
381 #
382 #    If args.recursive is True, also copy any collections, docker
383 #    images and git repositories that this template references.
384 #
385 #    The owner_uuid of the new template is changed to that of the user
386 #    who copied the template.
387 #
388 #    Returns the copied pipeline template object.
389 #
390 def copy_pipeline_template(pt_uuid, src, dst, args):
391     # fetch the pipeline template from the source instance
392     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
393
394     if not args.force_filters:
395         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
396         if filter_errors:
397             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
398                   "\n".join(filter_errors))
399
400     if args.recursive:
401         check_git_availability()
402
403         if not args.dst_git_repo:
404             abort('--dst-git-repo is required when copying a pipeline recursively.')
405         # Copy input collections, docker images and git repos.
406         pt = copy_collections(pt, src, dst, args)
407         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
408         copy_docker_images(pt, src, dst, args)
409
410     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
411         pt_uuid,
412         pt['description'] if pt.get('description', None) else '')
413     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
414     del pt['uuid']
415
416     pt['owner_uuid'] = args.project_uuid
417
418     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
419
420 # copy_workflow(wf_uuid, src, dst, args)
421 #
422 #    Copies a workflow identified by wf_uuid from src to dst.
423 #
424 #    If args.recursive is True, also copy any collections
425 #      referenced in the workflow definition yaml.
426 #
427 #    The owner_uuid of the new workflow is set to any given
428 #      project_uuid or the user who copied the template.
429 #
430 #    Returns the copied workflow object.
431 #
432 def copy_workflow(wf_uuid, src, dst, args):
433     # fetch the workflow from the source instance
434     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
435
436     # copy collections and docker images
437     if args.recursive:
438         wf_def = yaml.safe_load(wf["definition"])
439         if wf_def is not None:
440             locations = []
441             docker_images = {}
442             graph = wf_def.get('$graph', None)
443             if graph is not None:
444                 workflow_collections(graph, locations, docker_images)
445             else:
446                 workflow_collections(wf_def, locations, docker_images)
447
448             if locations:
449                 copy_collections(locations, src, dst, args)
450
451             for image in docker_images:
452                 copy_docker_image(image, docker_images[image], src, dst, args)
453
454     # copy the workflow itself
455     del wf['uuid']
456     wf['owner_uuid'] = args.project_uuid
457     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
458
459 def workflow_collections(obj, locations, docker_images):
460     if isinstance(obj, dict):
461         loc = obj.get('location', None)
462         if loc is not None:
463             if loc.startswith("keep:"):
464                 locations.append(loc[5:])
465
466         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
467         if docker_image is not None:
468             ds = docker_image.split(":", 1)
469             tag = ds[1] if len(ds)==2 else 'latest'
470             docker_images[ds[0]] = tag
471
472         for x in obj:
473             workflow_collections(obj[x], locations, docker_images)
474     elif isinstance(obj, list):
475         for x in obj:
476             workflow_collections(x, locations, docker_images)
477
478 # copy_collections(obj, src, dst, args)
479 #
480 #    Recursively copies all collections referenced by 'obj' from src
481 #    to dst.  obj may be a dict or a list, in which case we run
482 #    copy_collections on every value it contains. If it is a string,
483 #    search it for any substring that matches a collection hash or uuid
484 #    (this will find hidden references to collections like
485 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
486 #
487 #    Returns a copy of obj with any old collection uuids replaced by
488 #    the new ones.
489 #
490 def copy_collections(obj, src, dst, args):
491
492     def copy_collection_fn(collection_match):
493         """Helper function for regex substitution: copies a single collection,
494         identified by the collection_match MatchObject, to the
495         destination.  Returns the destination collection uuid (or the
496         portable data hash if that's what src_id is).
497
498         """
499         src_id = collection_match.group(0)
500         if src_id not in collections_copied:
501             dst_col = copy_collection(src_id, src, dst, args)
502             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
503                 collections_copied[src_id] = src_id
504             else:
505                 collections_copied[src_id] = dst_col['uuid']
506         return collections_copied[src_id]
507
508     if isinstance(obj, basestring):
509         # Copy any collections identified in this string to dst, replacing
510         # them with the dst uuids as necessary.
511         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
512         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
513         return obj
514     elif isinstance(obj, dict):
515         return type(obj)((v, copy_collections(obj[v], src, dst, args))
516                          for v in obj)
517     elif isinstance(obj, list):
518         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
519     return obj
520
521 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
522     """Copy a job's script to the destination repository, and update its record.
523
524     Given a jobspec dictionary, this function finds the referenced script from
525     src and copies it to dst and dst_repo.  It also updates jobspec in place to
526     refer to names on the destination.
527     """
528     repo = jobspec.get('repository')
529     if repo is None:
530         return
531     # script_version is the "script_version" parameter from the source
532     # component or job.  If no script_version was supplied in the
533     # component or job, it is a mistake in the pipeline, but for the
534     # purposes of copying the repository, default to "master".
535     script_version = jobspec.get('script_version') or 'master'
536     script_key = (repo, script_version)
537     if script_key not in scripts_copied:
538         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
539         scripts_copied.add(script_key)
540     jobspec['repository'] = dst_repo
541     repo_dir = local_repo_dir[repo]
542     for version_key in ['script_version', 'supplied_script_version']:
543         if version_key in jobspec:
544             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
545
546 # copy_git_repos(p, src, dst, dst_repo, args)
547 #
548 #    Copies all git repositories referenced by pipeline instance or
549 #    template 'p' from src to dst.
550 #
551 #    For each component c in the pipeline:
552 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
553 #      * Rename script versions:
554 #          * c['script_version']
555 #          * c['job']['script_version']
556 #          * c['job']['supplied_script_version']
557 #        to the commit hashes they resolve to, since any symbolic
558 #        names (tags, branches) are not preserved in the destination repo.
559 #
560 #    The pipeline object is updated in place with the new repository
561 #    names.  The return value is undefined.
562 #
563 def copy_git_repos(p, src, dst, dst_repo, args):
564     for component in p['components'].values():
565         migrate_jobspec(component, src, dst, dst_repo, args)
566         if 'job' in component:
567             migrate_jobspec(component['job'], src, dst, dst_repo, args)
568
569 def total_collection_size(manifest_text):
570     """Return the total number of bytes in this collection (excluding
571     duplicate blocks)."""
572
573     total_bytes = 0
574     locators_seen = {}
575     for line in manifest_text.splitlines():
576         words = line.split()
577         for word in words[1:]:
578             try:
579                 loc = arvados.KeepLocator(word)
580             except ValueError:
581                 continue  # this word isn't a locator, skip it
582             if loc.md5sum not in locators_seen:
583                 locators_seen[loc.md5sum] = True
584                 total_bytes += loc.size
585
586     return total_bytes
587
588 def create_collection_from(c, src, dst, args):
589     """Create a new collection record on dst, and copy Docker metadata if
590     available."""
591
592     collection_uuid = c['uuid']
593     del c['uuid']
594
595     if not c["name"]:
596         c['name'] = "copied from " + collection_uuid
597
598     if 'properties' in c:
599         del c['properties']
600
601     c['owner_uuid'] = args.project_uuid
602
603     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
604
605     # Create docker_image_repo+tag and docker_image_hash links
606     # at the destination.
607     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
608         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
609
610         for src_link in docker_links:
611             body = {key: src_link[key]
612                     for key in ['link_class', 'name', 'properties']}
613             body['head_uuid'] = dst_collection['uuid']
614             body['owner_uuid'] = args.project_uuid
615
616             lk = dst.links().create(body=body).execute(num_retries=args.retries)
617             logger.debug('created dst link {}'.format(lk))
618
619     return dst_collection
620
621 # copy_collection(obj_uuid, src, dst, args)
622 #
623 #    Copies the collection identified by obj_uuid from src to dst.
624 #    Returns the collection object created at dst.
625 #
626 #    If args.progress is True, produce a human-friendly progress
627 #    report.
628 #
629 #    If a collection with the desired portable_data_hash already
630 #    exists at dst, and args.force is False, copy_collection returns
631 #    the existing collection without copying any blocks.  Otherwise
632 #    (if no collection exists or if args.force is True)
633 #    copy_collection copies all of the collection data blocks from src
634 #    to dst.
635 #
636 #    For this application, it is critical to preserve the
637 #    collection's manifest hash, which is not guaranteed with the
638 #    arvados.CollectionReader and arvados.CollectionWriter classes.
639 #    Copying each block in the collection manually, followed by
640 #    the manifest block, ensures that the collection's manifest
641 #    hash will not change.
642 #
643 def copy_collection(obj_uuid, src, dst, args):
644     if arvados.util.keep_locator_pattern.match(obj_uuid):
645         # If the obj_uuid is a portable data hash, it might not be uniquely
646         # identified with a particular collection.  As a result, it is
647         # ambigious as to what name to use for the copy.  Apply some heuristics
648         # to pick which collection to get the name from.
649         srccol = src.collections().list(
650             filters=[['portable_data_hash', '=', obj_uuid]],
651             order="created_at asc"
652             ).execute(num_retries=args.retries)
653
654         items = srccol.get("items")
655
656         if not items:
657             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
658             return
659
660         c = None
661
662         if len(items) == 1:
663             # There's only one collection with the PDH, so use that.
664             c = items[0]
665         if not c:
666             # See if there is a collection that's in the same project
667             # as the root item (usually a pipeline) being copied.
668             for i in items:
669                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
670                     c = i
671                     break
672         if not c:
673             # Didn't find any collections located in the same project, so
674             # pick the oldest collection that has a name assigned to it.
675             for i in items:
676                 if i.get("name"):
677                     c = i
678                     break
679         if not c:
680             # None of the collections have names (?!), so just pick the
681             # first one.
682             c = items[0]
683
684         # list() doesn't return manifest text (and we don't want it to,
685         # because we don't need the same maninfest text sent to us 50
686         # times) so go and retrieve the collection object directly
687         # which will include the manifest text.
688         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
689     else:
690         # Assume this is an actual collection uuid, so fetch it directly.
691         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
692
693     # If a collection with this hash already exists at the
694     # destination, and 'force' is not true, just return that
695     # collection.
696     if not args.force:
697         if 'portable_data_hash' in c:
698             colhash = c['portable_data_hash']
699         else:
700             colhash = c['uuid']
701         dstcol = dst.collections().list(
702             filters=[['portable_data_hash', '=', colhash]]
703         ).execute(num_retries=args.retries)
704         if dstcol['items_available'] > 0:
705             for d in dstcol['items']:
706                 if ((args.project_uuid == d['owner_uuid']) and
707                     (c.get('name') == d['name']) and
708                     (c['portable_data_hash'] == d['portable_data_hash'])):
709                     return d
710             c['manifest_text'] = dst.collections().get(
711                 uuid=dstcol['items'][0]['uuid']
712             ).execute(num_retries=args.retries)['manifest_text']
713             return create_collection_from(c, src, dst, args)
714
715     # Fetch the collection's manifest.
716     manifest = c['manifest_text']
717     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
718
719     # Copy each block from src_keep to dst_keep.
720     # Use the newly signed locators returned from dst_keep to build
721     # a new manifest as we go.
722     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
723     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
724     dst_manifest = ""
725     dst_locators = {}
726     bytes_written = 0
727     bytes_expected = total_collection_size(manifest)
728     if args.progress:
729         progress_writer = ProgressWriter(human_progress)
730     else:
731         progress_writer = None
732
733     for line in manifest.splitlines():
734         words = line.split()
735         dst_manifest += words[0]
736         for word in words[1:]:
737             try:
738                 loc = arvados.KeepLocator(word)
739             except ValueError:
740                 # If 'word' can't be parsed as a locator,
741                 # presume it's a filename.
742                 dst_manifest += ' ' + word
743                 continue
744             blockhash = loc.md5sum
745             # copy this block if we haven't seen it before
746             # (otherwise, just reuse the existing dst_locator)
747             if blockhash not in dst_locators:
748                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
749                 if progress_writer:
750                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
751                 data = src_keep.get(word)
752                 dst_locator = dst_keep.put(data)
753                 dst_locators[blockhash] = dst_locator
754                 bytes_written += loc.size
755             dst_manifest += ' ' + dst_locators[blockhash]
756         dst_manifest += "\n"
757
758     if progress_writer:
759         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
760         progress_writer.finish()
761
762     # Copy the manifest and save the collection.
763     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
764
765     c['manifest_text'] = dst_manifest
766     return create_collection_from(c, src, dst, args)
767
768 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
769     r = api.repositories().list(
770         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
771     if r['items_available'] != 1:
772         raise Exception('cannot identify repo {}; {} repos found'
773                         .format(repo_name, r['items_available']))
774
775     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
776     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
777     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
778
779     priority = https_url + other_url + http_url
780
781     git_config = []
782     git_url = None
783     for url in priority:
784         if url.startswith("http"):
785             u = urllib.parse.urlsplit(url)
786             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
787             git_config = ["-c", "credential.%s/.username=none" % baseurl,
788                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
789         else:
790             git_config = []
791
792         try:
793             logger.debug("trying %s", url)
794             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
795                                       env={"HOME": os.environ["HOME"],
796                                            "ARVADOS_API_TOKEN": api.api_token,
797                                            "GIT_ASKPASS": "/bin/false"})
798         except arvados.errors.CommandFailedError:
799             pass
800         else:
801             git_url = url
802             break
803
804     if not git_url:
805         raise Exception('Cannot access git repository, tried {}'
806                         .format(priority))
807
808     if git_url.startswith("http:"):
809         if allow_insecure_http:
810             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
811         else:
812             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
813
814     return (git_url, git_config)
815
816
817 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
818 #
819 #    Copies commits from git repository 'src_git_repo' on Arvados
820 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
821 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
822 #    or "jsmith")
823 #
824 #    All commits will be copied to a destination branch named for the
825 #    source repository URL.
826 #
827 #    The destination repository must already exist.
828 #
829 #    The user running this command must be authenticated
830 #    to both repositories.
831 #
832 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
833     # Identify the fetch and push URLs for the git repositories.
834
835     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
836     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
837
838     logger.debug('src_git_url: {}'.format(src_git_url))
839     logger.debug('dst_git_url: {}'.format(dst_git_url))
840
841     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
842
843     # Copy git commits from src repo to dst repo.
844     if src_git_repo not in local_repo_dir:
845         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
846         arvados.util.run_command(
847             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
848              local_repo_dir[src_git_repo]],
849             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
850             env={"HOME": os.environ["HOME"],
851                  "ARVADOS_API_TOKEN": src.api_token,
852                  "GIT_ASKPASS": "/bin/false"})
853         arvados.util.run_command(
854             ["git", "remote", "add", "dst", dst_git_url],
855             cwd=local_repo_dir[src_git_repo])
856     arvados.util.run_command(
857         ["git", "branch", dst_branch, script_version],
858         cwd=local_repo_dir[src_git_repo])
859     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
860                              cwd=local_repo_dir[src_git_repo],
861                              env={"HOME": os.environ["HOME"],
862                                   "ARVADOS_API_TOKEN": dst.api_token,
863                                   "GIT_ASKPASS": "/bin/false"})
864
865 def copy_docker_images(pipeline, src, dst, args):
866     """Copy any docker images named in the pipeline components'
867     runtime_constraints field from src to dst."""
868
869     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
870     for c_name, c_info in pipeline['components'].items():
871         if ('runtime_constraints' in c_info and
872             'docker_image' in c_info['runtime_constraints']):
873             copy_docker_image(
874                 c_info['runtime_constraints']['docker_image'],
875                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
876                 src, dst, args)
877
878
879 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
880     """Copy the docker image identified by docker_image and
881     docker_image_tag from src to dst. Create appropriate
882     docker_image_repo+tag and docker_image_hash links at dst.
883
884     """
885
886     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
887
888     # Find the link identifying this docker image.
889     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
890         src, args.retries, docker_image, docker_image_tag)
891     if docker_image_list:
892         image_uuid, image_info = docker_image_list[0]
893         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
894
895         # Copy the collection it refers to.
896         dst_image_col = copy_collection(image_uuid, src, dst, args)
897     elif arvados.util.keep_locator_pattern.match(docker_image):
898         dst_image_col = copy_collection(docker_image, src, dst, args)
899     else:
900         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
901
902 # git_rev_parse(rev, repo)
903 #
904 #    Returns the 40-character commit hash corresponding to 'rev' in
905 #    git repository 'repo' (which must be the path of a local git
906 #    repository)
907 #
908 def git_rev_parse(rev, repo):
909     gitout, giterr = arvados.util.run_command(
910         ['git', 'rev-parse', rev], cwd=repo)
911     return gitout.strip()
912
913 # uuid_type(api, object_uuid)
914 #
915 #    Returns the name of the class that object_uuid belongs to, based on
916 #    the second field of the uuid.  This function consults the api's
917 #    schema to identify the object class.
918 #
919 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
920 #
921 #    Special case: if handed a Keep locator hash, return 'Collection'.
922 #
923 def uuid_type(api, object_uuid):
924     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
925         return 'Collection'
926     p = object_uuid.split('-')
927     if len(p) == 3:
928         type_prefix = p[1]
929         for k in api._schema.schemas:
930             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
931             if type_prefix == obj_class:
932                 return k
933     return None
934
935 def abort(msg, code=1):
936     logger.info("arv-copy: %s", msg)
937     exit(code)
938
939
940 # Code for reporting on the progress of a collection upload.
941 # Stolen from arvados.commands.put.ArvPutCollectionWriter
942 # TODO(twp): figure out how to refactor into a shared library
943 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
944 # code)
945
946 def machine_progress(obj_uuid, bytes_written, bytes_expected):
947     return "{} {}: {} {} written {} total\n".format(
948         sys.argv[0],
949         os.getpid(),
950         obj_uuid,
951         bytes_written,
952         -1 if (bytes_expected is None) else bytes_expected)
953
954 def human_progress(obj_uuid, bytes_written, bytes_expected):
955     if bytes_expected:
956         return "\r{}: {}M / {}M {:.1%} ".format(
957             obj_uuid,
958             bytes_written >> 20, bytes_expected >> 20,
959             float(bytes_written) / bytes_expected)
960     else:
961         return "\r{}: {} ".format(obj_uuid, bytes_written)
962
963 class ProgressWriter(object):
964     _progress_func = None
965     outfile = sys.stderr
966
967     def __init__(self, progress_func):
968         self._progress_func = progress_func
969
970     def report(self, obj_uuid, bytes_written, bytes_expected):
971         if self._progress_func is not None:
972             self.outfile.write(
973                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
974
975     def finish(self):
976         self.outfile.write("\n")
977
978 if __name__ == '__main__':
979     main()