11308: Remove superfluous hashbangs
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
2 #
3 # Copies an object from Arvados instance src to instance dst.
4 #
5 # By default, arv-copy recursively copies any dependent objects
6 # necessary to make the object functional in the new instance
7 # (e.g. for a pipeline instance, arv-copy copies the pipeline
8 # template, input collection, docker images, git repositories). If
9 # --no-recursive is given, arv-copy copies only the single record
10 # identified by object-uuid.
11 #
12 # The user must have files $HOME/.config/arvados/{src}.conf and
13 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
14 # instances src and dst.  If either of these files is not found,
15 # arv-copy will issue an error.
16
17 from __future__ import division
18 from future import standard_library
19 standard_library.install_aliases()
20 from past.builtins import basestring
21 from builtins import object
22 import argparse
23 import contextlib
24 import getpass
25 import os
26 import re
27 import shutil
28 import sys
29 import logging
30 import tempfile
31 import urllib.parse
32
33 import arvados
34 import arvados.config
35 import arvados.keep
36 import arvados.util
37 import arvados.commands._util as arv_cmd
38 import arvados.commands.keepdocker
39 import ruamel.yaml as yaml
40
41 from arvados.api import OrderedJsonModel
42 from arvados._version import __version__
43
44 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
45
46 logger = logging.getLogger('arvados.arv-copy')
47
48 # local_repo_dir records which git repositories from the Arvados source
49 # instance have been checked out locally during this run, and to which
50 # directories.
51 # e.g. if repository 'twp' from src_arv has been cloned into
52 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
53 #
54 local_repo_dir = {}
55
56 # List of collections that have been copied in this session, and their
57 # destination collection UUIDs.
58 collections_copied = {}
59
60 # Set of (repository, script_version) two-tuples of commits copied in git.
61 scripts_copied = set()
62
63 # The owner_uuid of the object being copied
64 src_owner_uuid = None
65
66 def main():
67     copy_opts = argparse.ArgumentParser(add_help=False)
68
69     copy_opts.add_argument(
70         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
71         help='Print version and exit.')
72     copy_opts.add_argument(
73         '-v', '--verbose', dest='verbose', action='store_true',
74         help='Verbose output.')
75     copy_opts.add_argument(
76         '--progress', dest='progress', action='store_true',
77         help='Report progress on copying collections. (default)')
78     copy_opts.add_argument(
79         '--no-progress', dest='progress', action='store_false',
80         help='Do not report progress on copying collections.')
81     copy_opts.add_argument(
82         '-f', '--force', dest='force', action='store_true',
83         help='Perform copy even if the object appears to exist at the remote destination.')
84     copy_opts.add_argument(
85         '--force-filters', action='store_true', default=False,
86         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
87     copy_opts.add_argument(
88         '--src', dest='source_arvados', required=True,
89         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
90     copy_opts.add_argument(
91         '--dst', dest='destination_arvados', required=True,
92         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93     copy_opts.add_argument(
94         '--recursive', dest='recursive', action='store_true',
95         help='Recursively copy any dependencies for this object. (default)')
96     copy_opts.add_argument(
97         '--no-recursive', dest='recursive', action='store_false',
98         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
99     copy_opts.add_argument(
100         '--dst-git-repo', dest='dst_git_repo',
101         help='The name of the destination git repository. Required when copying a pipeline recursively.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the pipeline should be copied.')
105     copy_opts.add_argument(
106         '--allow-git-http-src', action="store_true",
107         help='Allow cloning git repositories over insecure http')
108     copy_opts.add_argument(
109         '--allow-git-http-dst', action="store_true",
110         help='Allow pushing git repositories over insecure http')
111
112     copy_opts.add_argument(
113         'object_uuid',
114         help='The UUID of the object to be copied.')
115     copy_opts.set_defaults(progress=True)
116     copy_opts.set_defaults(recursive=True)
117
118     parser = argparse.ArgumentParser(
119         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
120         parents=[copy_opts, arv_cmd.retry_opt])
121     args = parser.parse_args()
122
123     if args.verbose:
124         logger.setLevel(logging.DEBUG)
125     else:
126         logger.setLevel(logging.INFO)
127
128     # Create API clients for the source and destination instances
129     src_arv = api_for_instance(args.source_arvados)
130     dst_arv = api_for_instance(args.destination_arvados)
131
132     if not args.project_uuid:
133         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
134
135     # Identify the kind of object we have been given, and begin copying.
136     t = uuid_type(src_arv, args.object_uuid)
137     if t == 'Collection':
138         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
139         result = copy_collection(args.object_uuid,
140                                  src_arv, dst_arv,
141                                  args)
142     elif t == 'PipelineInstance':
143         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
144         result = copy_pipeline_instance(args.object_uuid,
145                                         src_arv, dst_arv,
146                                         args)
147     elif t == 'PipelineTemplate':
148         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
149         result = copy_pipeline_template(args.object_uuid,
150                                         src_arv, dst_arv, args)
151     elif t == 'Workflow':
152         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
153         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
154     else:
155         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
156
157     # Clean up any outstanding temp git repositories.
158     for d in list(local_repo_dir.values()):
159         shutil.rmtree(d, ignore_errors=True)
160
161     # If no exception was thrown and the response does not have an
162     # error_token field, presume success
163     if 'error_token' in result or 'uuid' not in result:
164         logger.error("API server returned an error result: {}".format(result))
165         exit(1)
166
167     logger.info("")
168     logger.info("Success: created copy with uuid {}".format(result['uuid']))
169     exit(0)
170
171 def set_src_owner_uuid(resource, uuid, args):
172     global src_owner_uuid
173     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
174     src_owner_uuid = c.get("owner_uuid")
175
176 # api_for_instance(instance_name)
177 #
178 #     Creates an API client for the Arvados instance identified by
179 #     instance_name.
180 #
181 #     If instance_name contains a slash, it is presumed to be a path
182 #     (either local or absolute) to a file with Arvados configuration
183 #     settings.
184 #
185 #     Otherwise, it is presumed to be the name of a file in
186 #     $HOME/.config/arvados/instance_name.conf
187 #
188 def api_for_instance(instance_name):
189     if '/' in instance_name:
190         config_file = instance_name
191     else:
192         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
193
194     try:
195         cfg = arvados.config.load(config_file)
196     except (IOError, OSError) as e:
197         abort(("Could not open config file {}: {}\n" +
198                "You must make sure that your configuration tokens\n" +
199                "for Arvados instance {} are in {} and that this\n" +
200                "file is readable.").format(
201                    config_file, e, instance_name, config_file))
202
203     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
204         api_is_insecure = (
205             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
206                 ['1', 't', 'true', 'y', 'yes']))
207         client = arvados.api('v1',
208                              host=cfg['ARVADOS_API_HOST'],
209                              token=cfg['ARVADOS_API_TOKEN'],
210                              insecure=api_is_insecure,
211                              model=OrderedJsonModel())
212     else:
213         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
214     return client
215
216 # Check if git is available
217 def check_git_availability():
218     try:
219         arvados.util.run_command(['git', '--help'])
220     except Exception:
221         abort('git command is not available. Please ensure git is installed.')
222
223 # copy_pipeline_instance(pi_uuid, src, dst, args)
224 #
225 #    Copies a pipeline instance identified by pi_uuid from src to dst.
226 #
227 #    If the args.recursive option is set:
228 #      1. Copies all input collections
229 #           * For each component in the pipeline, include all collections
230 #             listed as job dependencies for that component)
231 #      2. Copy docker images
232 #      3. Copy git repositories
233 #      4. Copy the pipeline template
234 #
235 #    The only changes made to the copied pipeline instance are:
236 #      1. The original pipeline instance UUID is preserved in
237 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
238 #      2. The pipeline_template_uuid is changed to the new template uuid.
239 #      3. The owner_uuid of the instance is changed to the user who
240 #         copied it.
241 #
242 def copy_pipeline_instance(pi_uuid, src, dst, args):
243     # Fetch the pipeline instance record.
244     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
245
246     if args.recursive:
247         check_git_availability()
248
249         if not args.dst_git_repo:
250             abort('--dst-git-repo is required when copying a pipeline recursively.')
251         # Copy the pipeline template and save the copied template.
252         if pi.get('pipeline_template_uuid', None):
253             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
254                                         src, dst, args)
255
256         # Copy input collections, docker images and git repos.
257         pi = copy_collections(pi, src, dst, args)
258         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
259         copy_docker_images(pi, src, dst, args)
260
261         # Update the fields of the pipeline instance with the copied
262         # pipeline template.
263         if pi.get('pipeline_template_uuid', None):
264             pi['pipeline_template_uuid'] = pt['uuid']
265
266     else:
267         # not recursive
268         logger.info("Copying only pipeline instance %s.", pi_uuid)
269         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
270
271     # Update the pipeline instance properties, and create the new
272     # instance at dst.
273     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
274     pi['description'] = "Pipeline copied from {}\n\n{}".format(
275         pi_uuid,
276         pi['description'] if pi.get('description', None) else '')
277
278     pi['owner_uuid'] = args.project_uuid
279
280     del pi['uuid']
281
282     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
283     return new_pi
284
285 def filter_iter(arg):
286     """Iterate a filter string-or-list.
287
288     Pass in a filter field that can either be a string or list.
289     This will iterate elements as if the field had been written as a list.
290     """
291     if isinstance(arg, basestring):
292         return iter((arg,))
293     else:
294         return iter(arg)
295
296 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
297     """Update a single repository filter in-place for the destination.
298
299     If the filter checks that the repository is src_repository, it is
300     updated to check that the repository is dst_repository.  If it does
301     anything else, this function raises ValueError.
302     """
303     if src_repository is None:
304         raise ValueError("component does not specify a source repository")
305     elif dst_repository is None:
306         raise ValueError("no destination repository specified to update repository filter")
307     elif repo_filter[1:] == ['=', src_repository]:
308         repo_filter[2] = dst_repository
309     elif repo_filter[1:] == ['in', [src_repository]]:
310         repo_filter[2] = [dst_repository]
311     else:
312         raise ValueError("repository filter is not a simple source match")
313
314 def migrate_script_version_filter(version_filter):
315     """Update a single script_version filter in-place for the destination.
316
317     Currently this function checks that all the filter operands are Git
318     commit hashes.  If they're not, it raises ValueError to indicate that
319     the filter is not portable.  It could be extended to make other
320     transformations in the future.
321     """
322     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
323         raise ValueError("script_version filter is not limited to commit hashes")
324
325 def attr_filtered(filter_, *attr_names):
326     """Return True if filter_ applies to any of attr_names, else False."""
327     return any((name == 'any') or (name in attr_names)
328                for name in filter_iter(filter_[0]))
329
330 @contextlib.contextmanager
331 def exception_handler(handler, *exc_types):
332     """If any exc_types are raised in the block, call handler on the exception."""
333     try:
334         yield
335     except exc_types as error:
336         handler(error)
337
338 def migrate_components_filters(template_components, dst_git_repo):
339     """Update template component filters in-place for the destination.
340
341     template_components is a dictionary of components in a pipeline template.
342     This method walks over each component's filters, and updates them to have
343     identical semantics on the destination cluster.  It returns a list of
344     error strings that describe what filters could not be updated safely.
345
346     dst_git_repo is the name of the destination Git repository, which can
347     be None if that is not known.
348     """
349     errors = []
350     for cname, cspec in template_components.items():
351         def add_error(errmsg):
352             errors.append("{}: {}".format(cname, errmsg))
353         if not isinstance(cspec, dict):
354             add_error("value is not a component definition")
355             continue
356         src_repository = cspec.get('repository')
357         filters = cspec.get('filters', [])
358         if not isinstance(filters, list):
359             add_error("filters are not a list")
360             continue
361         for cfilter in filters:
362             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
363                 add_error("malformed filter {!r}".format(cfilter))
364                 continue
365             if attr_filtered(cfilter, 'repository'):
366                 with exception_handler(add_error, ValueError):
367                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
368             if attr_filtered(cfilter, 'script_version'):
369                 with exception_handler(add_error, ValueError):
370                     migrate_script_version_filter(cfilter)
371     return errors
372
373 # copy_pipeline_template(pt_uuid, src, dst, args)
374 #
375 #    Copies a pipeline template identified by pt_uuid from src to dst.
376 #
377 #    If args.recursive is True, also copy any collections, docker
378 #    images and git repositories that this template references.
379 #
380 #    The owner_uuid of the new template is changed to that of the user
381 #    who copied the template.
382 #
383 #    Returns the copied pipeline template object.
384 #
385 def copy_pipeline_template(pt_uuid, src, dst, args):
386     # fetch the pipeline template from the source instance
387     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
388
389     if not args.force_filters:
390         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
391         if filter_errors:
392             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
393                   "\n".join(filter_errors))
394
395     if args.recursive:
396         check_git_availability()
397
398         if not args.dst_git_repo:
399             abort('--dst-git-repo is required when copying a pipeline recursively.')
400         # Copy input collections, docker images and git repos.
401         pt = copy_collections(pt, src, dst, args)
402         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
403         copy_docker_images(pt, src, dst, args)
404
405     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
406         pt_uuid,
407         pt['description'] if pt.get('description', None) else '')
408     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
409     del pt['uuid']
410
411     pt['owner_uuid'] = args.project_uuid
412
413     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
414
415 # copy_workflow(wf_uuid, src, dst, args)
416 #
417 #    Copies a workflow identified by wf_uuid from src to dst.
418 #
419 #    If args.recursive is True, also copy any collections
420 #      referenced in the workflow definition yaml.
421 #
422 #    The owner_uuid of the new workflow is set to any given
423 #      project_uuid or the user who copied the template.
424 #
425 #    Returns the copied workflow object.
426 #
427 def copy_workflow(wf_uuid, src, dst, args):
428     # fetch the workflow from the source instance
429     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
430
431     # copy collections and docker images
432     if args.recursive:
433         wf_def = yaml.safe_load(wf["definition"])
434         if wf_def is not None:
435             locations = []
436             docker_images = {}
437             graph = wf_def.get('$graph', None)
438             if graph is not None:
439                 workflow_collections(graph, locations, docker_images)
440             else:
441                 workflow_collections(wf_def, locations, docker_images)
442
443             if locations:
444                 copy_collections(locations, src, dst, args)
445
446             for image in docker_images:
447                 copy_docker_image(image, docker_images[image], src, dst, args)
448
449     # copy the workflow itself
450     del wf['uuid']
451     wf['owner_uuid'] = args.project_uuid
452     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
453
454 def workflow_collections(obj, locations, docker_images):
455     if isinstance(obj, dict):
456         loc = obj.get('location', None)
457         if loc is not None:
458             if loc.startswith("keep:"):
459                 locations.append(loc[5:])
460
461         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
462         if docker_image is not None:
463             ds = docker_image.split(":", 1)
464             tag = ds[1] if len(ds)==2 else 'latest'
465             docker_images[ds[0]] = tag
466
467         for x in obj:
468             workflow_collections(obj[x], locations, docker_images)
469     elif isinstance(obj, list):
470         for x in obj:
471             workflow_collections(x, locations, docker_images)
472
473 # copy_collections(obj, src, dst, args)
474 #
475 #    Recursively copies all collections referenced by 'obj' from src
476 #    to dst.  obj may be a dict or a list, in which case we run
477 #    copy_collections on every value it contains. If it is a string,
478 #    search it for any substring that matches a collection hash or uuid
479 #    (this will find hidden references to collections like
480 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
481 #
482 #    Returns a copy of obj with any old collection uuids replaced by
483 #    the new ones.
484 #
485 def copy_collections(obj, src, dst, args):
486
487     def copy_collection_fn(collection_match):
488         """Helper function for regex substitution: copies a single collection,
489         identified by the collection_match MatchObject, to the
490         destination.  Returns the destination collection uuid (or the
491         portable data hash if that's what src_id is).
492
493         """
494         src_id = collection_match.group(0)
495         if src_id not in collections_copied:
496             dst_col = copy_collection(src_id, src, dst, args)
497             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
498                 collections_copied[src_id] = src_id
499             else:
500                 collections_copied[src_id] = dst_col['uuid']
501         return collections_copied[src_id]
502
503     if isinstance(obj, basestring):
504         # Copy any collections identified in this string to dst, replacing
505         # them with the dst uuids as necessary.
506         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
507         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
508         return obj
509     elif isinstance(obj, dict):
510         return type(obj)((v, copy_collections(obj[v], src, dst, args))
511                          for v in obj)
512     elif isinstance(obj, list):
513         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
514     return obj
515
516 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
517     """Copy a job's script to the destination repository, and update its record.
518
519     Given a jobspec dictionary, this function finds the referenced script from
520     src and copies it to dst and dst_repo.  It also updates jobspec in place to
521     refer to names on the destination.
522     """
523     repo = jobspec.get('repository')
524     if repo is None:
525         return
526     # script_version is the "script_version" parameter from the source
527     # component or job.  If no script_version was supplied in the
528     # component or job, it is a mistake in the pipeline, but for the
529     # purposes of copying the repository, default to "master".
530     script_version = jobspec.get('script_version') or 'master'
531     script_key = (repo, script_version)
532     if script_key not in scripts_copied:
533         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
534         scripts_copied.add(script_key)
535     jobspec['repository'] = dst_repo
536     repo_dir = local_repo_dir[repo]
537     for version_key in ['script_version', 'supplied_script_version']:
538         if version_key in jobspec:
539             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
540
541 # copy_git_repos(p, src, dst, dst_repo, args)
542 #
543 #    Copies all git repositories referenced by pipeline instance or
544 #    template 'p' from src to dst.
545 #
546 #    For each component c in the pipeline:
547 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
548 #      * Rename script versions:
549 #          * c['script_version']
550 #          * c['job']['script_version']
551 #          * c['job']['supplied_script_version']
552 #        to the commit hashes they resolve to, since any symbolic
553 #        names (tags, branches) are not preserved in the destination repo.
554 #
555 #    The pipeline object is updated in place with the new repository
556 #    names.  The return value is undefined.
557 #
558 def copy_git_repos(p, src, dst, dst_repo, args):
559     for component in p['components'].values():
560         migrate_jobspec(component, src, dst, dst_repo, args)
561         if 'job' in component:
562             migrate_jobspec(component['job'], src, dst, dst_repo, args)
563
564 def total_collection_size(manifest_text):
565     """Return the total number of bytes in this collection (excluding
566     duplicate blocks)."""
567
568     total_bytes = 0
569     locators_seen = {}
570     for line in manifest_text.splitlines():
571         words = line.split()
572         for word in words[1:]:
573             try:
574                 loc = arvados.KeepLocator(word)
575             except ValueError:
576                 continue  # this word isn't a locator, skip it
577             if loc.md5sum not in locators_seen:
578                 locators_seen[loc.md5sum] = True
579                 total_bytes += loc.size
580
581     return total_bytes
582
583 def create_collection_from(c, src, dst, args):
584     """Create a new collection record on dst, and copy Docker metadata if
585     available."""
586
587     collection_uuid = c['uuid']
588     del c['uuid']
589
590     if not c["name"]:
591         c['name'] = "copied from " + collection_uuid
592
593     if 'properties' in c:
594         del c['properties']
595
596     c['owner_uuid'] = args.project_uuid
597
598     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
599
600     # Create docker_image_repo+tag and docker_image_hash links
601     # at the destination.
602     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
603         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
604
605         for src_link in docker_links:
606             body = {key: src_link[key]
607                     for key in ['link_class', 'name', 'properties']}
608             body['head_uuid'] = dst_collection['uuid']
609             body['owner_uuid'] = args.project_uuid
610
611             lk = dst.links().create(body=body).execute(num_retries=args.retries)
612             logger.debug('created dst link {}'.format(lk))
613
614     return dst_collection
615
616 # copy_collection(obj_uuid, src, dst, args)
617 #
618 #    Copies the collection identified by obj_uuid from src to dst.
619 #    Returns the collection object created at dst.
620 #
621 #    If args.progress is True, produce a human-friendly progress
622 #    report.
623 #
624 #    If a collection with the desired portable_data_hash already
625 #    exists at dst, and args.force is False, copy_collection returns
626 #    the existing collection without copying any blocks.  Otherwise
627 #    (if no collection exists or if args.force is True)
628 #    copy_collection copies all of the collection data blocks from src
629 #    to dst.
630 #
631 #    For this application, it is critical to preserve the
632 #    collection's manifest hash, which is not guaranteed with the
633 #    arvados.CollectionReader and arvados.CollectionWriter classes.
634 #    Copying each block in the collection manually, followed by
635 #    the manifest block, ensures that the collection's manifest
636 #    hash will not change.
637 #
638 def copy_collection(obj_uuid, src, dst, args):
639     if arvados.util.keep_locator_pattern.match(obj_uuid):
640         # If the obj_uuid is a portable data hash, it might not be uniquely
641         # identified with a particular collection.  As a result, it is
642         # ambigious as to what name to use for the copy.  Apply some heuristics
643         # to pick which collection to get the name from.
644         srccol = src.collections().list(
645             filters=[['portable_data_hash', '=', obj_uuid]],
646             order="created_at asc"
647             ).execute(num_retries=args.retries)
648
649         items = srccol.get("items")
650
651         if not items:
652             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
653             return
654
655         c = None
656
657         if len(items) == 1:
658             # There's only one collection with the PDH, so use that.
659             c = items[0]
660         if not c:
661             # See if there is a collection that's in the same project
662             # as the root item (usually a pipeline) being copied.
663             for i in items:
664                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
665                     c = i
666                     break
667         if not c:
668             # Didn't find any collections located in the same project, so
669             # pick the oldest collection that has a name assigned to it.
670             for i in items:
671                 if i.get("name"):
672                     c = i
673                     break
674         if not c:
675             # None of the collections have names (?!), so just pick the
676             # first one.
677             c = items[0]
678
679         # list() doesn't return manifest text (and we don't want it to,
680         # because we don't need the same maninfest text sent to us 50
681         # times) so go and retrieve the collection object directly
682         # which will include the manifest text.
683         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
684     else:
685         # Assume this is an actual collection uuid, so fetch it directly.
686         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
687
688     # If a collection with this hash already exists at the
689     # destination, and 'force' is not true, just return that
690     # collection.
691     if not args.force:
692         if 'portable_data_hash' in c:
693             colhash = c['portable_data_hash']
694         else:
695             colhash = c['uuid']
696         dstcol = dst.collections().list(
697             filters=[['portable_data_hash', '=', colhash]]
698         ).execute(num_retries=args.retries)
699         if dstcol['items_available'] > 0:
700             for d in dstcol['items']:
701                 if ((args.project_uuid == d['owner_uuid']) and
702                     (c.get('name') == d['name']) and
703                     (c['portable_data_hash'] == d['portable_data_hash'])):
704                     return d
705             c['manifest_text'] = dst.collections().get(
706                 uuid=dstcol['items'][0]['uuid']
707             ).execute(num_retries=args.retries)['manifest_text']
708             return create_collection_from(c, src, dst, args)
709
710     # Fetch the collection's manifest.
711     manifest = c['manifest_text']
712     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
713
714     # Copy each block from src_keep to dst_keep.
715     # Use the newly signed locators returned from dst_keep to build
716     # a new manifest as we go.
717     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
718     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
719     dst_manifest = ""
720     dst_locators = {}
721     bytes_written = 0
722     bytes_expected = total_collection_size(manifest)
723     if args.progress:
724         progress_writer = ProgressWriter(human_progress)
725     else:
726         progress_writer = None
727
728     for line in manifest.splitlines():
729         words = line.split()
730         dst_manifest += words[0]
731         for word in words[1:]:
732             try:
733                 loc = arvados.KeepLocator(word)
734             except ValueError:
735                 # If 'word' can't be parsed as a locator,
736                 # presume it's a filename.
737                 dst_manifest += ' ' + word
738                 continue
739             blockhash = loc.md5sum
740             # copy this block if we haven't seen it before
741             # (otherwise, just reuse the existing dst_locator)
742             if blockhash not in dst_locators:
743                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
744                 if progress_writer:
745                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
746                 data = src_keep.get(word)
747                 dst_locator = dst_keep.put(data)
748                 dst_locators[blockhash] = dst_locator
749                 bytes_written += loc.size
750             dst_manifest += ' ' + dst_locators[blockhash]
751         dst_manifest += "\n"
752
753     if progress_writer:
754         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
755         progress_writer.finish()
756
757     # Copy the manifest and save the collection.
758     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
759
760     c['manifest_text'] = dst_manifest
761     return create_collection_from(c, src, dst, args)
762
763 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
764     r = api.repositories().list(
765         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
766     if r['items_available'] != 1:
767         raise Exception('cannot identify repo {}; {} repos found'
768                         .format(repo_name, r['items_available']))
769
770     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
771     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
772     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
773
774     priority = https_url + other_url + http_url
775
776     git_config = []
777     git_url = None
778     for url in priority:
779         if url.startswith("http"):
780             u = urllib.parse.urlsplit(url)
781             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
782             git_config = ["-c", "credential.%s/.username=none" % baseurl,
783                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
784         else:
785             git_config = []
786
787         try:
788             logger.debug("trying %s", url)
789             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
790                                       env={"HOME": os.environ["HOME"],
791                                            "ARVADOS_API_TOKEN": api.api_token,
792                                            "GIT_ASKPASS": "/bin/false"})
793         except arvados.errors.CommandFailedError:
794             pass
795         else:
796             git_url = url
797             break
798
799     if not git_url:
800         raise Exception('Cannot access git repository, tried {}'
801                         .format(priority))
802
803     if git_url.startswith("http:"):
804         if allow_insecure_http:
805             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
806         else:
807             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
808
809     return (git_url, git_config)
810
811
812 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
813 #
814 #    Copies commits from git repository 'src_git_repo' on Arvados
815 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
816 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
817 #    or "jsmith")
818 #
819 #    All commits will be copied to a destination branch named for the
820 #    source repository URL.
821 #
822 #    The destination repository must already exist.
823 #
824 #    The user running this command must be authenticated
825 #    to both repositories.
826 #
827 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
828     # Identify the fetch and push URLs for the git repositories.
829
830     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
831     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
832
833     logger.debug('src_git_url: {}'.format(src_git_url))
834     logger.debug('dst_git_url: {}'.format(dst_git_url))
835
836     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
837
838     # Copy git commits from src repo to dst repo.
839     if src_git_repo not in local_repo_dir:
840         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
841         arvados.util.run_command(
842             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
843              local_repo_dir[src_git_repo]],
844             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
845             env={"HOME": os.environ["HOME"],
846                  "ARVADOS_API_TOKEN": src.api_token,
847                  "GIT_ASKPASS": "/bin/false"})
848         arvados.util.run_command(
849             ["git", "remote", "add", "dst", dst_git_url],
850             cwd=local_repo_dir[src_git_repo])
851     arvados.util.run_command(
852         ["git", "branch", dst_branch, script_version],
853         cwd=local_repo_dir[src_git_repo])
854     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
855                              cwd=local_repo_dir[src_git_repo],
856                              env={"HOME": os.environ["HOME"],
857                                   "ARVADOS_API_TOKEN": dst.api_token,
858                                   "GIT_ASKPASS": "/bin/false"})
859
860 def copy_docker_images(pipeline, src, dst, args):
861     """Copy any docker images named in the pipeline components'
862     runtime_constraints field from src to dst."""
863
864     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
865     for c_name, c_info in pipeline['components'].items():
866         if ('runtime_constraints' in c_info and
867             'docker_image' in c_info['runtime_constraints']):
868             copy_docker_image(
869                 c_info['runtime_constraints']['docker_image'],
870                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
871                 src, dst, args)
872
873
874 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
875     """Copy the docker image identified by docker_image and
876     docker_image_tag from src to dst. Create appropriate
877     docker_image_repo+tag and docker_image_hash links at dst.
878
879     """
880
881     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
882
883     # Find the link identifying this docker image.
884     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
885         src, args.retries, docker_image, docker_image_tag)
886     if docker_image_list:
887         image_uuid, image_info = docker_image_list[0]
888         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
889
890         # Copy the collection it refers to.
891         dst_image_col = copy_collection(image_uuid, src, dst, args)
892     elif arvados.util.keep_locator_pattern.match(docker_image):
893         dst_image_col = copy_collection(docker_image, src, dst, args)
894     else:
895         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
896
897 # git_rev_parse(rev, repo)
898 #
899 #    Returns the 40-character commit hash corresponding to 'rev' in
900 #    git repository 'repo' (which must be the path of a local git
901 #    repository)
902 #
903 def git_rev_parse(rev, repo):
904     gitout, giterr = arvados.util.run_command(
905         ['git', 'rev-parse', rev], cwd=repo)
906     return gitout.strip()
907
908 # uuid_type(api, object_uuid)
909 #
910 #    Returns the name of the class that object_uuid belongs to, based on
911 #    the second field of the uuid.  This function consults the api's
912 #    schema to identify the object class.
913 #
914 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
915 #
916 #    Special case: if handed a Keep locator hash, return 'Collection'.
917 #
918 def uuid_type(api, object_uuid):
919     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
920         return 'Collection'
921     p = object_uuid.split('-')
922     if len(p) == 3:
923         type_prefix = p[1]
924         for k in api._schema.schemas:
925             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
926             if type_prefix == obj_class:
927                 return k
928     return None
929
930 def abort(msg, code=1):
931     logger.info("arv-copy: %s", msg)
932     exit(code)
933
934
935 # Code for reporting on the progress of a collection upload.
936 # Stolen from arvados.commands.put.ArvPutCollectionWriter
937 # TODO(twp): figure out how to refactor into a shared library
938 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
939 # code)
940
941 def machine_progress(obj_uuid, bytes_written, bytes_expected):
942     return "{} {}: {} {} written {} total\n".format(
943         sys.argv[0],
944         os.getpid(),
945         obj_uuid,
946         bytes_written,
947         -1 if (bytes_expected is None) else bytes_expected)
948
949 def human_progress(obj_uuid, bytes_written, bytes_expected):
950     if bytes_expected:
951         return "\r{}: {}M / {}M {:.1%} ".format(
952             obj_uuid,
953             bytes_written >> 20, bytes_expected >> 20,
954             float(bytes_written) / bytes_expected)
955     else:
956         return "\r{}: {} ".format(obj_uuid, bytes_written)
957
958 class ProgressWriter(object):
959     _progress_func = None
960     outfile = sys.stderr
961
962     def __init__(self, progress_func):
963         self._progress_func = progress_func
964
965     def report(self, obj_uuid, bytes_written, bytes_expected):
966         if self._progress_func is not None:
967             self.outfile.write(
968                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
969
970     def finish(self):
971         self.outfile.write("\n")
972
973 if __name__ == '__main__':
974     main()