11308: Fix deprecated logger.warn -> logger.warning.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 from __future__ import division
20 from future import standard_library
21 standard_library.install_aliases()
22 from past.builtins import basestring
23 from builtins import object
24 import argparse
25 import contextlib
26 import getpass
27 import os
28 import re
29 import shutil
30 import sys
31 import logging
32 import tempfile
33 import urllib.parse
34
35 import arvados
36 import arvados.config
37 import arvados.keep
38 import arvados.util
39 import arvados.commands._util as arv_cmd
40 import arvados.commands.keepdocker
41 import ruamel.yaml as yaml
42
43 from arvados.api import OrderedJsonModel
44 from arvados._version import __version__
45
46 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
47
48 logger = logging.getLogger('arvados.arv-copy')
49
50 # local_repo_dir records which git repositories from the Arvados source
51 # instance have been checked out locally during this run, and to which
52 # directories.
53 # e.g. if repository 'twp' from src_arv has been cloned into
54 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
55 #
56 local_repo_dir = {}
57
58 # List of collections that have been copied in this session, and their
59 # destination collection UUIDs.
60 collections_copied = {}
61
62 # Set of (repository, script_version) two-tuples of commits copied in git.
63 scripts_copied = set()
64
65 # The owner_uuid of the object being copied
66 src_owner_uuid = None
67
68 def main():
69     copy_opts = argparse.ArgumentParser(add_help=False)
70
71     copy_opts.add_argument(
72         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
73         help='Print version and exit.')
74     copy_opts.add_argument(
75         '-v', '--verbose', dest='verbose', action='store_true',
76         help='Verbose output.')
77     copy_opts.add_argument(
78         '--progress', dest='progress', action='store_true',
79         help='Report progress on copying collections. (default)')
80     copy_opts.add_argument(
81         '--no-progress', dest='progress', action='store_false',
82         help='Do not report progress on copying collections.')
83     copy_opts.add_argument(
84         '-f', '--force', dest='force', action='store_true',
85         help='Perform copy even if the object appears to exist at the remote destination.')
86     copy_opts.add_argument(
87         '--force-filters', action='store_true', default=False,
88         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
89     copy_opts.add_argument(
90         '--src', dest='source_arvados', required=True,
91         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92     copy_opts.add_argument(
93         '--dst', dest='destination_arvados', required=True,
94         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--recursive', dest='recursive', action='store_true',
97         help='Recursively copy any dependencies for this object. (default)')
98     copy_opts.add_argument(
99         '--no-recursive', dest='recursive', action='store_false',
100         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101     copy_opts.add_argument(
102         '--dst-git-repo', dest='dst_git_repo',
103         help='The name of the destination git repository. Required when copying a pipeline recursively.')
104     copy_opts.add_argument(
105         '--project-uuid', dest='project_uuid',
106         help='The UUID of the project at the destination to which the pipeline should be copied.')
107     copy_opts.add_argument(
108         '--allow-git-http-src', action="store_true",
109         help='Allow cloning git repositories over insecure http')
110     copy_opts.add_argument(
111         '--allow-git-http-dst', action="store_true",
112         help='Allow pushing git repositories over insecure http')
113
114     copy_opts.add_argument(
115         'object_uuid',
116         help='The UUID of the object to be copied.')
117     copy_opts.set_defaults(progress=True)
118     copy_opts.set_defaults(recursive=True)
119
120     parser = argparse.ArgumentParser(
121         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
122         parents=[copy_opts, arv_cmd.retry_opt])
123     args = parser.parse_args()
124
125     if args.verbose:
126         logger.setLevel(logging.DEBUG)
127     else:
128         logger.setLevel(logging.INFO)
129
130     # Create API clients for the source and destination instances
131     src_arv = api_for_instance(args.source_arvados)
132     dst_arv = api_for_instance(args.destination_arvados)
133
134     if not args.project_uuid:
135         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
136
137     # Identify the kind of object we have been given, and begin copying.
138     t = uuid_type(src_arv, args.object_uuid)
139     if t == 'Collection':
140         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
141         result = copy_collection(args.object_uuid,
142                                  src_arv, dst_arv,
143                                  args)
144     elif t == 'PipelineInstance':
145         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
146         result = copy_pipeline_instance(args.object_uuid,
147                                         src_arv, dst_arv,
148                                         args)
149     elif t == 'PipelineTemplate':
150         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
151         result = copy_pipeline_template(args.object_uuid,
152                                         src_arv, dst_arv, args)
153     elif t == 'Workflow':
154         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
155         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
156     else:
157         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
158
159     # Clean up any outstanding temp git repositories.
160     for d in list(local_repo_dir.values()):
161         shutil.rmtree(d, ignore_errors=True)
162
163     # If no exception was thrown and the response does not have an
164     # error_token field, presume success
165     if 'error_token' in result or 'uuid' not in result:
166         logger.error("API server returned an error result: {}".format(result))
167         exit(1)
168
169     logger.info("")
170     logger.info("Success: created copy with uuid {}".format(result['uuid']))
171     exit(0)
172
173 def set_src_owner_uuid(resource, uuid, args):
174     global src_owner_uuid
175     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
176     src_owner_uuid = c.get("owner_uuid")
177
178 # api_for_instance(instance_name)
179 #
180 #     Creates an API client for the Arvados instance identified by
181 #     instance_name.
182 #
183 #     If instance_name contains a slash, it is presumed to be a path
184 #     (either local or absolute) to a file with Arvados configuration
185 #     settings.
186 #
187 #     Otherwise, it is presumed to be the name of a file in
188 #     $HOME/.config/arvados/instance_name.conf
189 #
190 def api_for_instance(instance_name):
191     if '/' in instance_name:
192         config_file = instance_name
193     else:
194         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
195
196     try:
197         cfg = arvados.config.load(config_file)
198     except (IOError, OSError) as e:
199         abort(("Could not open config file {}: {}\n" +
200                "You must make sure that your configuration tokens\n" +
201                "for Arvados instance {} are in {} and that this\n" +
202                "file is readable.").format(
203                    config_file, e, instance_name, config_file))
204
205     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
206         api_is_insecure = (
207             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
208                 ['1', 't', 'true', 'y', 'yes']))
209         client = arvados.api('v1',
210                              host=cfg['ARVADOS_API_HOST'],
211                              token=cfg['ARVADOS_API_TOKEN'],
212                              insecure=api_is_insecure,
213                              model=OrderedJsonModel())
214     else:
215         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
216     return client
217
218 # Check if git is available
219 def check_git_availability():
220     try:
221         arvados.util.run_command(['git', '--help'])
222     except Exception:
223         abort('git command is not available. Please ensure git is installed.')
224
225 # copy_pipeline_instance(pi_uuid, src, dst, args)
226 #
227 #    Copies a pipeline instance identified by pi_uuid from src to dst.
228 #
229 #    If the args.recursive option is set:
230 #      1. Copies all input collections
231 #           * For each component in the pipeline, include all collections
232 #             listed as job dependencies for that component)
233 #      2. Copy docker images
234 #      3. Copy git repositories
235 #      4. Copy the pipeline template
236 #
237 #    The only changes made to the copied pipeline instance are:
238 #      1. The original pipeline instance UUID is preserved in
239 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
240 #      2. The pipeline_template_uuid is changed to the new template uuid.
241 #      3. The owner_uuid of the instance is changed to the user who
242 #         copied it.
243 #
244 def copy_pipeline_instance(pi_uuid, src, dst, args):
245     # Fetch the pipeline instance record.
246     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
247
248     if args.recursive:
249         check_git_availability()
250
251         if not args.dst_git_repo:
252             abort('--dst-git-repo is required when copying a pipeline recursively.')
253         # Copy the pipeline template and save the copied template.
254         if pi.get('pipeline_template_uuid', None):
255             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
256                                         src, dst, args)
257
258         # Copy input collections, docker images and git repos.
259         pi = copy_collections(pi, src, dst, args)
260         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
261         copy_docker_images(pi, src, dst, args)
262
263         # Update the fields of the pipeline instance with the copied
264         # pipeline template.
265         if pi.get('pipeline_template_uuid', None):
266             pi['pipeline_template_uuid'] = pt['uuid']
267
268     else:
269         # not recursive
270         logger.info("Copying only pipeline instance %s.", pi_uuid)
271         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
272
273     # Update the pipeline instance properties, and create the new
274     # instance at dst.
275     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
276     pi['description'] = "Pipeline copied from {}\n\n{}".format(
277         pi_uuid,
278         pi['description'] if pi.get('description', None) else '')
279
280     pi['owner_uuid'] = args.project_uuid
281
282     del pi['uuid']
283
284     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
285     return new_pi
286
287 def filter_iter(arg):
288     """Iterate a filter string-or-list.
289
290     Pass in a filter field that can either be a string or list.
291     This will iterate elements as if the field had been written as a list.
292     """
293     if isinstance(arg, basestring):
294         return iter((arg,))
295     else:
296         return iter(arg)
297
298 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
299     """Update a single repository filter in-place for the destination.
300
301     If the filter checks that the repository is src_repository, it is
302     updated to check that the repository is dst_repository.  If it does
303     anything else, this function raises ValueError.
304     """
305     if src_repository is None:
306         raise ValueError("component does not specify a source repository")
307     elif dst_repository is None:
308         raise ValueError("no destination repository specified to update repository filter")
309     elif repo_filter[1:] == ['=', src_repository]:
310         repo_filter[2] = dst_repository
311     elif repo_filter[1:] == ['in', [src_repository]]:
312         repo_filter[2] = [dst_repository]
313     else:
314         raise ValueError("repository filter is not a simple source match")
315
316 def migrate_script_version_filter(version_filter):
317     """Update a single script_version filter in-place for the destination.
318
319     Currently this function checks that all the filter operands are Git
320     commit hashes.  If they're not, it raises ValueError to indicate that
321     the filter is not portable.  It could be extended to make other
322     transformations in the future.
323     """
324     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
325         raise ValueError("script_version filter is not limited to commit hashes")
326
327 def attr_filtered(filter_, *attr_names):
328     """Return True if filter_ applies to any of attr_names, else False."""
329     return any((name == 'any') or (name in attr_names)
330                for name in filter_iter(filter_[0]))
331
332 @contextlib.contextmanager
333 def exception_handler(handler, *exc_types):
334     """If any exc_types are raised in the block, call handler on the exception."""
335     try:
336         yield
337     except exc_types as error:
338         handler(error)
339
340 def migrate_components_filters(template_components, dst_git_repo):
341     """Update template component filters in-place for the destination.
342
343     template_components is a dictionary of components in a pipeline template.
344     This method walks over each component's filters, and updates them to have
345     identical semantics on the destination cluster.  It returns a list of
346     error strings that describe what filters could not be updated safely.
347
348     dst_git_repo is the name of the destination Git repository, which can
349     be None if that is not known.
350     """
351     errors = []
352     for cname, cspec in template_components.items():
353         def add_error(errmsg):
354             errors.append("{}: {}".format(cname, errmsg))
355         if not isinstance(cspec, dict):
356             add_error("value is not a component definition")
357             continue
358         src_repository = cspec.get('repository')
359         filters = cspec.get('filters', [])
360         if not isinstance(filters, list):
361             add_error("filters are not a list")
362             continue
363         for cfilter in filters:
364             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
365                 add_error("malformed filter {!r}".format(cfilter))
366                 continue
367             if attr_filtered(cfilter, 'repository'):
368                 with exception_handler(add_error, ValueError):
369                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
370             if attr_filtered(cfilter, 'script_version'):
371                 with exception_handler(add_error, ValueError):
372                     migrate_script_version_filter(cfilter)
373     return errors
374
375 # copy_pipeline_template(pt_uuid, src, dst, args)
376 #
377 #    Copies a pipeline template identified by pt_uuid from src to dst.
378 #
379 #    If args.recursive is True, also copy any collections, docker
380 #    images and git repositories that this template references.
381 #
382 #    The owner_uuid of the new template is changed to that of the user
383 #    who copied the template.
384 #
385 #    Returns the copied pipeline template object.
386 #
387 def copy_pipeline_template(pt_uuid, src, dst, args):
388     # fetch the pipeline template from the source instance
389     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
390
391     if not args.force_filters:
392         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
393         if filter_errors:
394             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
395                   "\n".join(filter_errors))
396
397     if args.recursive:
398         check_git_availability()
399
400         if not args.dst_git_repo:
401             abort('--dst-git-repo is required when copying a pipeline recursively.')
402         # Copy input collections, docker images and git repos.
403         pt = copy_collections(pt, src, dst, args)
404         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
405         copy_docker_images(pt, src, dst, args)
406
407     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
408         pt_uuid,
409         pt['description'] if pt.get('description', None) else '')
410     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
411     del pt['uuid']
412
413     pt['owner_uuid'] = args.project_uuid
414
415     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
416
417 # copy_workflow(wf_uuid, src, dst, args)
418 #
419 #    Copies a workflow identified by wf_uuid from src to dst.
420 #
421 #    If args.recursive is True, also copy any collections
422 #      referenced in the workflow definition yaml.
423 #
424 #    The owner_uuid of the new workflow is set to any given
425 #      project_uuid or the user who copied the template.
426 #
427 #    Returns the copied workflow object.
428 #
429 def copy_workflow(wf_uuid, src, dst, args):
430     # fetch the workflow from the source instance
431     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
432
433     # copy collections and docker images
434     if args.recursive:
435         wf_def = yaml.safe_load(wf["definition"])
436         if wf_def is not None:
437             locations = []
438             docker_images = {}
439             graph = wf_def.get('$graph', None)
440             if graph is not None:
441                 workflow_collections(graph, locations, docker_images)
442             else:
443                 workflow_collections(wf_def, locations, docker_images)
444
445             if locations:
446                 copy_collections(locations, src, dst, args)
447
448             for image in docker_images:
449                 copy_docker_image(image, docker_images[image], src, dst, args)
450
451     # copy the workflow itself
452     del wf['uuid']
453     wf['owner_uuid'] = args.project_uuid
454     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
455
456 def workflow_collections(obj, locations, docker_images):
457     if isinstance(obj, dict):
458         loc = obj.get('location', None)
459         if loc is not None:
460             if loc.startswith("keep:"):
461                 locations.append(loc[5:])
462
463         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
464         if docker_image is not None:
465             ds = docker_image.split(":", 1)
466             tag = ds[1] if len(ds)==2 else 'latest'
467             docker_images[ds[0]] = tag
468
469         for x in obj:
470             workflow_collections(obj[x], locations, docker_images)
471     elif isinstance(obj, list):
472         for x in obj:
473             workflow_collections(x, locations, docker_images)
474
475 # copy_collections(obj, src, dst, args)
476 #
477 #    Recursively copies all collections referenced by 'obj' from src
478 #    to dst.  obj may be a dict or a list, in which case we run
479 #    copy_collections on every value it contains. If it is a string,
480 #    search it for any substring that matches a collection hash or uuid
481 #    (this will find hidden references to collections like
482 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
483 #
484 #    Returns a copy of obj with any old collection uuids replaced by
485 #    the new ones.
486 #
487 def copy_collections(obj, src, dst, args):
488
489     def copy_collection_fn(collection_match):
490         """Helper function for regex substitution: copies a single collection,
491         identified by the collection_match MatchObject, to the
492         destination.  Returns the destination collection uuid (or the
493         portable data hash if that's what src_id is).
494
495         """
496         src_id = collection_match.group(0)
497         if src_id not in collections_copied:
498             dst_col = copy_collection(src_id, src, dst, args)
499             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
500                 collections_copied[src_id] = src_id
501             else:
502                 collections_copied[src_id] = dst_col['uuid']
503         return collections_copied[src_id]
504
505     if isinstance(obj, basestring):
506         # Copy any collections identified in this string to dst, replacing
507         # them with the dst uuids as necessary.
508         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
509         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
510         return obj
511     elif isinstance(obj, dict):
512         return type(obj)((v, copy_collections(obj[v], src, dst, args))
513                          for v in obj)
514     elif isinstance(obj, list):
515         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
516     return obj
517
518 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
519     """Copy a job's script to the destination repository, and update its record.
520
521     Given a jobspec dictionary, this function finds the referenced script from
522     src and copies it to dst and dst_repo.  It also updates jobspec in place to
523     refer to names on the destination.
524     """
525     repo = jobspec.get('repository')
526     if repo is None:
527         return
528     # script_version is the "script_version" parameter from the source
529     # component or job.  If no script_version was supplied in the
530     # component or job, it is a mistake in the pipeline, but for the
531     # purposes of copying the repository, default to "master".
532     script_version = jobspec.get('script_version') or 'master'
533     script_key = (repo, script_version)
534     if script_key not in scripts_copied:
535         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
536         scripts_copied.add(script_key)
537     jobspec['repository'] = dst_repo
538     repo_dir = local_repo_dir[repo]
539     for version_key in ['script_version', 'supplied_script_version']:
540         if version_key in jobspec:
541             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
542
543 # copy_git_repos(p, src, dst, dst_repo, args)
544 #
545 #    Copies all git repositories referenced by pipeline instance or
546 #    template 'p' from src to dst.
547 #
548 #    For each component c in the pipeline:
549 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
550 #      * Rename script versions:
551 #          * c['script_version']
552 #          * c['job']['script_version']
553 #          * c['job']['supplied_script_version']
554 #        to the commit hashes they resolve to, since any symbolic
555 #        names (tags, branches) are not preserved in the destination repo.
556 #
557 #    The pipeline object is updated in place with the new repository
558 #    names.  The return value is undefined.
559 #
560 def copy_git_repos(p, src, dst, dst_repo, args):
561     for component in p['components'].values():
562         migrate_jobspec(component, src, dst, dst_repo, args)
563         if 'job' in component:
564             migrate_jobspec(component['job'], src, dst, dst_repo, args)
565
566 def total_collection_size(manifest_text):
567     """Return the total number of bytes in this collection (excluding
568     duplicate blocks)."""
569
570     total_bytes = 0
571     locators_seen = {}
572     for line in manifest_text.splitlines():
573         words = line.split()
574         for word in words[1:]:
575             try:
576                 loc = arvados.KeepLocator(word)
577             except ValueError:
578                 continue  # this word isn't a locator, skip it
579             if loc.md5sum not in locators_seen:
580                 locators_seen[loc.md5sum] = True
581                 total_bytes += loc.size
582
583     return total_bytes
584
585 def create_collection_from(c, src, dst, args):
586     """Create a new collection record on dst, and copy Docker metadata if
587     available."""
588
589     collection_uuid = c['uuid']
590     del c['uuid']
591
592     if not c["name"]:
593         c['name'] = "copied from " + collection_uuid
594
595     if 'properties' in c:
596         del c['properties']
597
598     c['owner_uuid'] = args.project_uuid
599
600     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
601
602     # Create docker_image_repo+tag and docker_image_hash links
603     # at the destination.
604     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
605         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
606
607         for src_link in docker_links:
608             body = {key: src_link[key]
609                     for key in ['link_class', 'name', 'properties']}
610             body['head_uuid'] = dst_collection['uuid']
611             body['owner_uuid'] = args.project_uuid
612
613             lk = dst.links().create(body=body).execute(num_retries=args.retries)
614             logger.debug('created dst link {}'.format(lk))
615
616     return dst_collection
617
618 # copy_collection(obj_uuid, src, dst, args)
619 #
620 #    Copies the collection identified by obj_uuid from src to dst.
621 #    Returns the collection object created at dst.
622 #
623 #    If args.progress is True, produce a human-friendly progress
624 #    report.
625 #
626 #    If a collection with the desired portable_data_hash already
627 #    exists at dst, and args.force is False, copy_collection returns
628 #    the existing collection without copying any blocks.  Otherwise
629 #    (if no collection exists or if args.force is True)
630 #    copy_collection copies all of the collection data blocks from src
631 #    to dst.
632 #
633 #    For this application, it is critical to preserve the
634 #    collection's manifest hash, which is not guaranteed with the
635 #    arvados.CollectionReader and arvados.CollectionWriter classes.
636 #    Copying each block in the collection manually, followed by
637 #    the manifest block, ensures that the collection's manifest
638 #    hash will not change.
639 #
640 def copy_collection(obj_uuid, src, dst, args):
641     if arvados.util.keep_locator_pattern.match(obj_uuid):
642         # If the obj_uuid is a portable data hash, it might not be uniquely
643         # identified with a particular collection.  As a result, it is
644         # ambigious as to what name to use for the copy.  Apply some heuristics
645         # to pick which collection to get the name from.
646         srccol = src.collections().list(
647             filters=[['portable_data_hash', '=', obj_uuid]],
648             order="created_at asc"
649             ).execute(num_retries=args.retries)
650
651         items = srccol.get("items")
652
653         if not items:
654             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
655             return
656
657         c = None
658
659         if len(items) == 1:
660             # There's only one collection with the PDH, so use that.
661             c = items[0]
662         if not c:
663             # See if there is a collection that's in the same project
664             # as the root item (usually a pipeline) being copied.
665             for i in items:
666                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
667                     c = i
668                     break
669         if not c:
670             # Didn't find any collections located in the same project, so
671             # pick the oldest collection that has a name assigned to it.
672             for i in items:
673                 if i.get("name"):
674                     c = i
675                     break
676         if not c:
677             # None of the collections have names (?!), so just pick the
678             # first one.
679             c = items[0]
680
681         # list() doesn't return manifest text (and we don't want it to,
682         # because we don't need the same maninfest text sent to us 50
683         # times) so go and retrieve the collection object directly
684         # which will include the manifest text.
685         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
686     else:
687         # Assume this is an actual collection uuid, so fetch it directly.
688         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
689
690     # If a collection with this hash already exists at the
691     # destination, and 'force' is not true, just return that
692     # collection.
693     if not args.force:
694         if 'portable_data_hash' in c:
695             colhash = c['portable_data_hash']
696         else:
697             colhash = c['uuid']
698         dstcol = dst.collections().list(
699             filters=[['portable_data_hash', '=', colhash]]
700         ).execute(num_retries=args.retries)
701         if dstcol['items_available'] > 0:
702             for d in dstcol['items']:
703                 if ((args.project_uuid == d['owner_uuid']) and
704                     (c.get('name') == d['name']) and
705                     (c['portable_data_hash'] == d['portable_data_hash'])):
706                     return d
707             c['manifest_text'] = dst.collections().get(
708                 uuid=dstcol['items'][0]['uuid']
709             ).execute(num_retries=args.retries)['manifest_text']
710             return create_collection_from(c, src, dst, args)
711
712     # Fetch the collection's manifest.
713     manifest = c['manifest_text']
714     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
715
716     # Copy each block from src_keep to dst_keep.
717     # Use the newly signed locators returned from dst_keep to build
718     # a new manifest as we go.
719     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
720     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
721     dst_manifest = ""
722     dst_locators = {}
723     bytes_written = 0
724     bytes_expected = total_collection_size(manifest)
725     if args.progress:
726         progress_writer = ProgressWriter(human_progress)
727     else:
728         progress_writer = None
729
730     for line in manifest.splitlines():
731         words = line.split()
732         dst_manifest += words[0]
733         for word in words[1:]:
734             try:
735                 loc = arvados.KeepLocator(word)
736             except ValueError:
737                 # If 'word' can't be parsed as a locator,
738                 # presume it's a filename.
739                 dst_manifest += ' ' + word
740                 continue
741             blockhash = loc.md5sum
742             # copy this block if we haven't seen it before
743             # (otherwise, just reuse the existing dst_locator)
744             if blockhash not in dst_locators:
745                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
746                 if progress_writer:
747                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
748                 data = src_keep.get(word)
749                 dst_locator = dst_keep.put(data)
750                 dst_locators[blockhash] = dst_locator
751                 bytes_written += loc.size
752             dst_manifest += ' ' + dst_locators[blockhash]
753         dst_manifest += "\n"
754
755     if progress_writer:
756         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
757         progress_writer.finish()
758
759     # Copy the manifest and save the collection.
760     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
761
762     c['manifest_text'] = dst_manifest
763     return create_collection_from(c, src, dst, args)
764
765 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
766     r = api.repositories().list(
767         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
768     if r['items_available'] != 1:
769         raise Exception('cannot identify repo {}; {} repos found'
770                         .format(repo_name, r['items_available']))
771
772     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
773     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
774     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
775
776     priority = https_url + other_url + http_url
777
778     git_config = []
779     git_url = None
780     for url in priority:
781         if url.startswith("http"):
782             u = urllib.parse.urlsplit(url)
783             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
784             git_config = ["-c", "credential.%s/.username=none" % baseurl,
785                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
786         else:
787             git_config = []
788
789         try:
790             logger.debug("trying %s", url)
791             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
792                                       env={"HOME": os.environ["HOME"],
793                                            "ARVADOS_API_TOKEN": api.api_token,
794                                            "GIT_ASKPASS": "/bin/false"})
795         except arvados.errors.CommandFailedError:
796             pass
797         else:
798             git_url = url
799             break
800
801     if not git_url:
802         raise Exception('Cannot access git repository, tried {}'
803                         .format(priority))
804
805     if git_url.startswith("http:"):
806         if allow_insecure_http:
807             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
808         else:
809             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
810
811     return (git_url, git_config)
812
813
814 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
815 #
816 #    Copies commits from git repository 'src_git_repo' on Arvados
817 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
818 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
819 #    or "jsmith")
820 #
821 #    All commits will be copied to a destination branch named for the
822 #    source repository URL.
823 #
824 #    The destination repository must already exist.
825 #
826 #    The user running this command must be authenticated
827 #    to both repositories.
828 #
829 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
830     # Identify the fetch and push URLs for the git repositories.
831
832     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
833     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
834
835     logger.debug('src_git_url: {}'.format(src_git_url))
836     logger.debug('dst_git_url: {}'.format(dst_git_url))
837
838     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
839
840     # Copy git commits from src repo to dst repo.
841     if src_git_repo not in local_repo_dir:
842         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
843         arvados.util.run_command(
844             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
845              local_repo_dir[src_git_repo]],
846             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
847             env={"HOME": os.environ["HOME"],
848                  "ARVADOS_API_TOKEN": src.api_token,
849                  "GIT_ASKPASS": "/bin/false"})
850         arvados.util.run_command(
851             ["git", "remote", "add", "dst", dst_git_url],
852             cwd=local_repo_dir[src_git_repo])
853     arvados.util.run_command(
854         ["git", "branch", dst_branch, script_version],
855         cwd=local_repo_dir[src_git_repo])
856     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
857                              cwd=local_repo_dir[src_git_repo],
858                              env={"HOME": os.environ["HOME"],
859                                   "ARVADOS_API_TOKEN": dst.api_token,
860                                   "GIT_ASKPASS": "/bin/false"})
861
862 def copy_docker_images(pipeline, src, dst, args):
863     """Copy any docker images named in the pipeline components'
864     runtime_constraints field from src to dst."""
865
866     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
867     for c_name, c_info in pipeline['components'].items():
868         if ('runtime_constraints' in c_info and
869             'docker_image' in c_info['runtime_constraints']):
870             copy_docker_image(
871                 c_info['runtime_constraints']['docker_image'],
872                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
873                 src, dst, args)
874
875
876 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
877     """Copy the docker image identified by docker_image and
878     docker_image_tag from src to dst. Create appropriate
879     docker_image_repo+tag and docker_image_hash links at dst.
880
881     """
882
883     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
884
885     # Find the link identifying this docker image.
886     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
887         src, args.retries, docker_image, docker_image_tag)
888     if docker_image_list:
889         image_uuid, image_info = docker_image_list[0]
890         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
891
892         # Copy the collection it refers to.
893         dst_image_col = copy_collection(image_uuid, src, dst, args)
894     elif arvados.util.keep_locator_pattern.match(docker_image):
895         dst_image_col = copy_collection(docker_image, src, dst, args)
896     else:
897         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
898
899 # git_rev_parse(rev, repo)
900 #
901 #    Returns the 40-character commit hash corresponding to 'rev' in
902 #    git repository 'repo' (which must be the path of a local git
903 #    repository)
904 #
905 def git_rev_parse(rev, repo):
906     gitout, giterr = arvados.util.run_command(
907         ['git', 'rev-parse', rev], cwd=repo)
908     return gitout.strip()
909
910 # uuid_type(api, object_uuid)
911 #
912 #    Returns the name of the class that object_uuid belongs to, based on
913 #    the second field of the uuid.  This function consults the api's
914 #    schema to identify the object class.
915 #
916 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
917 #
918 #    Special case: if handed a Keep locator hash, return 'Collection'.
919 #
920 def uuid_type(api, object_uuid):
921     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
922         return 'Collection'
923     p = object_uuid.split('-')
924     if len(p) == 3:
925         type_prefix = p[1]
926         for k in api._schema.schemas:
927             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
928             if type_prefix == obj_class:
929                 return k
930     return None
931
932 def abort(msg, code=1):
933     logger.info("arv-copy: %s", msg)
934     exit(code)
935
936
937 # Code for reporting on the progress of a collection upload.
938 # Stolen from arvados.commands.put.ArvPutCollectionWriter
939 # TODO(twp): figure out how to refactor into a shared library
940 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
941 # code)
942
943 def machine_progress(obj_uuid, bytes_written, bytes_expected):
944     return "{} {}: {} {} written {} total\n".format(
945         sys.argv[0],
946         os.getpid(),
947         obj_uuid,
948         bytes_written,
949         -1 if (bytes_expected is None) else bytes_expected)
950
951 def human_progress(obj_uuid, bytes_written, bytes_expected):
952     if bytes_expected:
953         return "\r{}: {}M / {}M {:.1%} ".format(
954             obj_uuid,
955             bytes_written >> 20, bytes_expected >> 20,
956             float(bytes_written) / bytes_expected)
957     else:
958         return "\r{}: {} ".format(obj_uuid, bytes_written)
959
960 class ProgressWriter(object):
961     _progress_func = None
962     outfile = sys.stderr
963
964     def __init__(self, progress_func):
965         self._progress_func = progress_func
966
967     def report(self, obj_uuid, bytes_written, bytes_expected):
968         if self._progress_func is not None:
969             self.outfile.write(
970                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
971
972     def finish(self):
973         self.outfile.write("\n")
974
975 if __name__ == '__main__':
976     main()