8784: Fix test for latest firefox.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
2 #
3 # Copies an object from Arvados instance src to instance dst.
4 #
5 # By default, arv-copy recursively copies any dependent objects
6 # necessary to make the object functional in the new instance
7 # (e.g. for a pipeline instance, arv-copy copies the pipeline
8 # template, input collection, docker images, git repositories). If
9 # --no-recursive is given, arv-copy copies only the single record
10 # identified by object-uuid.
11 #
12 # The user must have files $HOME/.config/arvados/{src}.conf and
13 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
14 # instances src and dst.  If either of these files is not found,
15 # arv-copy will issue an error.
16
17 from __future__ import division
18 from future import standard_library
19 from future.utils import listvalues
20 standard_library.install_aliases()
21 from past.builtins import basestring
22 from builtins import object
23 import argparse
24 import contextlib
25 import getpass
26 import os
27 import re
28 import shutil
29 import sys
30 import logging
31 import tempfile
32 import urllib.parse
33
34 import arvados
35 import arvados.config
36 import arvados.keep
37 import arvados.util
38 import arvados.commands._util as arv_cmd
39 import arvados.commands.keepdocker
40 import ruamel.yaml as yaml
41
42 from arvados.api import OrderedJsonModel
43 from arvados._version import __version__
44
45 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
46
47 logger = logging.getLogger('arvados.arv-copy')
48
49 # local_repo_dir records which git repositories from the Arvados source
50 # instance have been checked out locally during this run, and to which
51 # directories.
52 # e.g. if repository 'twp' from src_arv has been cloned into
53 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
54 #
55 local_repo_dir = {}
56
57 # List of collections that have been copied in this session, and their
58 # destination collection UUIDs.
59 collections_copied = {}
60
61 # Set of (repository, script_version) two-tuples of commits copied in git.
62 scripts_copied = set()
63
64 # The owner_uuid of the object being copied
65 src_owner_uuid = None
66
67 def main():
68     copy_opts = argparse.ArgumentParser(add_help=False)
69
70     copy_opts.add_argument(
71         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
72         help='Print version and exit.')
73     copy_opts.add_argument(
74         '-v', '--verbose', dest='verbose', action='store_true',
75         help='Verbose output.')
76     copy_opts.add_argument(
77         '--progress', dest='progress', action='store_true',
78         help='Report progress on copying collections. (default)')
79     copy_opts.add_argument(
80         '--no-progress', dest='progress', action='store_false',
81         help='Do not report progress on copying collections.')
82     copy_opts.add_argument(
83         '-f', '--force', dest='force', action='store_true',
84         help='Perform copy even if the object appears to exist at the remote destination.')
85     copy_opts.add_argument(
86         '--force-filters', action='store_true', default=False,
87         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
88     copy_opts.add_argument(
89         '--src', dest='source_arvados', required=True,
90         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
91     copy_opts.add_argument(
92         '--dst', dest='destination_arvados', required=True,
93         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
94     copy_opts.add_argument(
95         '--recursive', dest='recursive', action='store_true',
96         help='Recursively copy any dependencies for this object. (default)')
97     copy_opts.add_argument(
98         '--no-recursive', dest='recursive', action='store_false',
99         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
100     copy_opts.add_argument(
101         '--dst-git-repo', dest='dst_git_repo',
102         help='The name of the destination git repository. Required when copying a pipeline recursively.')
103     copy_opts.add_argument(
104         '--project-uuid', dest='project_uuid',
105         help='The UUID of the project at the destination to which the pipeline should be copied.')
106     copy_opts.add_argument(
107         '--allow-git-http-src', action="store_true",
108         help='Allow cloning git repositories over insecure http')
109     copy_opts.add_argument(
110         '--allow-git-http-dst', action="store_true",
111         help='Allow pushing git repositories over insecure http')
112
113     copy_opts.add_argument(
114         'object_uuid',
115         help='The UUID of the object to be copied.')
116     copy_opts.set_defaults(progress=True)
117     copy_opts.set_defaults(recursive=True)
118
119     parser = argparse.ArgumentParser(
120         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
121         parents=[copy_opts, arv_cmd.retry_opt])
122     args = parser.parse_args()
123
124     if args.verbose:
125         logger.setLevel(logging.DEBUG)
126     else:
127         logger.setLevel(logging.INFO)
128
129     # Create API clients for the source and destination instances
130     src_arv = api_for_instance(args.source_arvados)
131     dst_arv = api_for_instance(args.destination_arvados)
132
133     if not args.project_uuid:
134         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
135
136     # Identify the kind of object we have been given, and begin copying.
137     t = uuid_type(src_arv, args.object_uuid)
138     if t == 'Collection':
139         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
140         result = copy_collection(args.object_uuid,
141                                  src_arv, dst_arv,
142                                  args)
143     elif t == 'PipelineInstance':
144         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
145         result = copy_pipeline_instance(args.object_uuid,
146                                         src_arv, dst_arv,
147                                         args)
148     elif t == 'PipelineTemplate':
149         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
150         result = copy_pipeline_template(args.object_uuid,
151                                         src_arv, dst_arv, args)
152     elif t == 'Workflow':
153         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
154         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
155     else:
156         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
157
158     # Clean up any outstanding temp git repositories.
159     for d in listvalues(local_repo_dir):
160         shutil.rmtree(d, ignore_errors=True)
161
162     # If no exception was thrown and the response does not have an
163     # error_token field, presume success
164     if 'error_token' in result or 'uuid' not in result:
165         logger.error("API server returned an error result: {}".format(result))
166         exit(1)
167
168     logger.info("")
169     logger.info("Success: created copy with uuid {}".format(result['uuid']))
170     exit(0)
171
172 def set_src_owner_uuid(resource, uuid, args):
173     global src_owner_uuid
174     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
175     src_owner_uuid = c.get("owner_uuid")
176
177 # api_for_instance(instance_name)
178 #
179 #     Creates an API client for the Arvados instance identified by
180 #     instance_name.
181 #
182 #     If instance_name contains a slash, it is presumed to be a path
183 #     (either local or absolute) to a file with Arvados configuration
184 #     settings.
185 #
186 #     Otherwise, it is presumed to be the name of a file in
187 #     $HOME/.config/arvados/instance_name.conf
188 #
189 def api_for_instance(instance_name):
190     if '/' in instance_name:
191         config_file = instance_name
192     else:
193         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
194
195     try:
196         cfg = arvados.config.load(config_file)
197     except (IOError, OSError) as e:
198         abort(("Could not open config file {}: {}\n" +
199                "You must make sure that your configuration tokens\n" +
200                "for Arvados instance {} are in {} and that this\n" +
201                "file is readable.").format(
202                    config_file, e, instance_name, config_file))
203
204     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
205         api_is_insecure = (
206             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
207                 ['1', 't', 'true', 'y', 'yes']))
208         client = arvados.api('v1',
209                              host=cfg['ARVADOS_API_HOST'],
210                              token=cfg['ARVADOS_API_TOKEN'],
211                              insecure=api_is_insecure,
212                              model=OrderedJsonModel())
213     else:
214         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
215     return client
216
217 # Check if git is available
218 def check_git_availability():
219     try:
220         arvados.util.run_command(['git', '--help'])
221     except Exception:
222         abort('git command is not available. Please ensure git is installed.')
223
224 # copy_pipeline_instance(pi_uuid, src, dst, args)
225 #
226 #    Copies a pipeline instance identified by pi_uuid from src to dst.
227 #
228 #    If the args.recursive option is set:
229 #      1. Copies all input collections
230 #           * For each component in the pipeline, include all collections
231 #             listed as job dependencies for that component)
232 #      2. Copy docker images
233 #      3. Copy git repositories
234 #      4. Copy the pipeline template
235 #
236 #    The only changes made to the copied pipeline instance are:
237 #      1. The original pipeline instance UUID is preserved in
238 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
239 #      2. The pipeline_template_uuid is changed to the new template uuid.
240 #      3. The owner_uuid of the instance is changed to the user who
241 #         copied it.
242 #
243 def copy_pipeline_instance(pi_uuid, src, dst, args):
244     # Fetch the pipeline instance record.
245     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
246
247     if args.recursive:
248         check_git_availability()
249
250         if not args.dst_git_repo:
251             abort('--dst-git-repo is required when copying a pipeline recursively.')
252         # Copy the pipeline template and save the copied template.
253         if pi.get('pipeline_template_uuid', None):
254             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
255                                         src, dst, args)
256
257         # Copy input collections, docker images and git repos.
258         pi = copy_collections(pi, src, dst, args)
259         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
260         copy_docker_images(pi, src, dst, args)
261
262         # Update the fields of the pipeline instance with the copied
263         # pipeline template.
264         if pi.get('pipeline_template_uuid', None):
265             pi['pipeline_template_uuid'] = pt['uuid']
266
267     else:
268         # not recursive
269         logger.info("Copying only pipeline instance %s.", pi_uuid)
270         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
271
272     # Update the pipeline instance properties, and create the new
273     # instance at dst.
274     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
275     pi['description'] = "Pipeline copied from {}\n\n{}".format(
276         pi_uuid,
277         pi['description'] if pi.get('description', None) else '')
278
279     pi['owner_uuid'] = args.project_uuid
280
281     del pi['uuid']
282
283     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
284     return new_pi
285
286 def filter_iter(arg):
287     """Iterate a filter string-or-list.
288
289     Pass in a filter field that can either be a string or list.
290     This will iterate elements as if the field had been written as a list.
291     """
292     if isinstance(arg, basestring):
293         return iter((arg,))
294     else:
295         return iter(arg)
296
297 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
298     """Update a single repository filter in-place for the destination.
299
300     If the filter checks that the repository is src_repository, it is
301     updated to check that the repository is dst_repository.  If it does
302     anything else, this function raises ValueError.
303     """
304     if src_repository is None:
305         raise ValueError("component does not specify a source repository")
306     elif dst_repository is None:
307         raise ValueError("no destination repository specified to update repository filter")
308     elif repo_filter[1:] == ['=', src_repository]:
309         repo_filter[2] = dst_repository
310     elif repo_filter[1:] == ['in', [src_repository]]:
311         repo_filter[2] = [dst_repository]
312     else:
313         raise ValueError("repository filter is not a simple source match")
314
315 def migrate_script_version_filter(version_filter):
316     """Update a single script_version filter in-place for the destination.
317
318     Currently this function checks that all the filter operands are Git
319     commit hashes.  If they're not, it raises ValueError to indicate that
320     the filter is not portable.  It could be extended to make other
321     transformations in the future.
322     """
323     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
324         raise ValueError("script_version filter is not limited to commit hashes")
325
326 def attr_filtered(filter_, *attr_names):
327     """Return True if filter_ applies to any of attr_names, else False."""
328     return any((name == 'any') or (name in attr_names)
329                for name in filter_iter(filter_[0]))
330
331 @contextlib.contextmanager
332 def exception_handler(handler, *exc_types):
333     """If any exc_types are raised in the block, call handler on the exception."""
334     try:
335         yield
336     except exc_types as error:
337         handler(error)
338
339 def migrate_components_filters(template_components, dst_git_repo):
340     """Update template component filters in-place for the destination.
341
342     template_components is a dictionary of components in a pipeline template.
343     This method walks over each component's filters, and updates them to have
344     identical semantics on the destination cluster.  It returns a list of
345     error strings that describe what filters could not be updated safely.
346
347     dst_git_repo is the name of the destination Git repository, which can
348     be None if that is not known.
349     """
350     errors = []
351     for cname, cspec in template_components.items():
352         def add_error(errmsg):
353             errors.append("{}: {}".format(cname, errmsg))
354         if not isinstance(cspec, dict):
355             add_error("value is not a component definition")
356             continue
357         src_repository = cspec.get('repository')
358         filters = cspec.get('filters', [])
359         if not isinstance(filters, list):
360             add_error("filters are not a list")
361             continue
362         for cfilter in filters:
363             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
364                 add_error("malformed filter {!r}".format(cfilter))
365                 continue
366             if attr_filtered(cfilter, 'repository'):
367                 with exception_handler(add_error, ValueError):
368                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
369             if attr_filtered(cfilter, 'script_version'):
370                 with exception_handler(add_error, ValueError):
371                     migrate_script_version_filter(cfilter)
372     return errors
373
374 # copy_pipeline_template(pt_uuid, src, dst, args)
375 #
376 #    Copies a pipeline template identified by pt_uuid from src to dst.
377 #
378 #    If args.recursive is True, also copy any collections, docker
379 #    images and git repositories that this template references.
380 #
381 #    The owner_uuid of the new template is changed to that of the user
382 #    who copied the template.
383 #
384 #    Returns the copied pipeline template object.
385 #
386 def copy_pipeline_template(pt_uuid, src, dst, args):
387     # fetch the pipeline template from the source instance
388     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
389
390     if not args.force_filters:
391         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
392         if filter_errors:
393             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
394                   "\n".join(filter_errors))
395
396     if args.recursive:
397         check_git_availability()
398
399         if not args.dst_git_repo:
400             abort('--dst-git-repo is required when copying a pipeline recursively.')
401         # Copy input collections, docker images and git repos.
402         pt = copy_collections(pt, src, dst, args)
403         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
404         copy_docker_images(pt, src, dst, args)
405
406     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
407         pt_uuid,
408         pt['description'] if pt.get('description', None) else '')
409     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
410     del pt['uuid']
411
412     pt['owner_uuid'] = args.project_uuid
413
414     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
415
416 # copy_workflow(wf_uuid, src, dst, args)
417 #
418 #    Copies a workflow identified by wf_uuid from src to dst.
419 #
420 #    If args.recursive is True, also copy any collections
421 #      referenced in the workflow definition yaml.
422 #
423 #    The owner_uuid of the new workflow is set to any given
424 #      project_uuid or the user who copied the template.
425 #
426 #    Returns the copied workflow object.
427 #
428 def copy_workflow(wf_uuid, src, dst, args):
429     # fetch the workflow from the source instance
430     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
431
432     # copy collections and docker images
433     if args.recursive:
434         wf_def = yaml.safe_load(wf["definition"])
435         if wf_def is not None:
436             locations = []
437             docker_images = {}
438             graph = wf_def.get('$graph', None)
439             if graph is not None:
440                 workflow_collections(graph, locations, docker_images)
441             else:
442                 workflow_collections(wf_def, locations, docker_images)
443
444             if locations:
445                 copy_collections(locations, src, dst, args)
446
447             for image in docker_images:
448                 copy_docker_image(image, docker_images[image], src, dst, args)
449
450     # copy the workflow itself
451     del wf['uuid']
452     wf['owner_uuid'] = args.project_uuid
453     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
454
455 def workflow_collections(obj, locations, docker_images):
456     if isinstance(obj, dict):
457         loc = obj.get('location', None)
458         if loc is not None:
459             if loc.startswith("keep:"):
460                 locations.append(loc[5:])
461
462         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
463         if docker_image is not None:
464             ds = docker_image.split(":", 1)
465             tag = ds[1] if len(ds)==2 else 'latest'
466             docker_images[ds[0]] = tag
467
468         for x in obj:
469             workflow_collections(obj[x], locations, docker_images)
470     elif isinstance(obj, list):
471         for x in obj:
472             workflow_collections(x, locations, docker_images)
473
474 # copy_collections(obj, src, dst, args)
475 #
476 #    Recursively copies all collections referenced by 'obj' from src
477 #    to dst.  obj may be a dict or a list, in which case we run
478 #    copy_collections on every value it contains. If it is a string,
479 #    search it for any substring that matches a collection hash or uuid
480 #    (this will find hidden references to collections like
481 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
482 #
483 #    Returns a copy of obj with any old collection uuids replaced by
484 #    the new ones.
485 #
486 def copy_collections(obj, src, dst, args):
487
488     def copy_collection_fn(collection_match):
489         """Helper function for regex substitution: copies a single collection,
490         identified by the collection_match MatchObject, to the
491         destination.  Returns the destination collection uuid (or the
492         portable data hash if that's what src_id is).
493
494         """
495         src_id = collection_match.group(0)
496         if src_id not in collections_copied:
497             dst_col = copy_collection(src_id, src, dst, args)
498             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
499                 collections_copied[src_id] = src_id
500             else:
501                 collections_copied[src_id] = dst_col['uuid']
502         return collections_copied[src_id]
503
504     if isinstance(obj, basestring):
505         # Copy any collections identified in this string to dst, replacing
506         # them with the dst uuids as necessary.
507         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
508         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
509         return obj
510     elif isinstance(obj, dict):
511         return type(obj)((v, copy_collections(obj[v], src, dst, args))
512                          for v in obj)
513     elif isinstance(obj, list):
514         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
515     return obj
516
517 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
518     """Copy a job's script to the destination repository, and update its record.
519
520     Given a jobspec dictionary, this function finds the referenced script from
521     src and copies it to dst and dst_repo.  It also updates jobspec in place to
522     refer to names on the destination.
523     """
524     repo = jobspec.get('repository')
525     if repo is None:
526         return
527     # script_version is the "script_version" parameter from the source
528     # component or job.  If no script_version was supplied in the
529     # component or job, it is a mistake in the pipeline, but for the
530     # purposes of copying the repository, default to "master".
531     script_version = jobspec.get('script_version') or 'master'
532     script_key = (repo, script_version)
533     if script_key not in scripts_copied:
534         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
535         scripts_copied.add(script_key)
536     jobspec['repository'] = dst_repo
537     repo_dir = local_repo_dir[repo]
538     for version_key in ['script_version', 'supplied_script_version']:
539         if version_key in jobspec:
540             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
541
542 # copy_git_repos(p, src, dst, dst_repo, args)
543 #
544 #    Copies all git repositories referenced by pipeline instance or
545 #    template 'p' from src to dst.
546 #
547 #    For each component c in the pipeline:
548 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
549 #      * Rename script versions:
550 #          * c['script_version']
551 #          * c['job']['script_version']
552 #          * c['job']['supplied_script_version']
553 #        to the commit hashes they resolve to, since any symbolic
554 #        names (tags, branches) are not preserved in the destination repo.
555 #
556 #    The pipeline object is updated in place with the new repository
557 #    names.  The return value is undefined.
558 #
559 def copy_git_repos(p, src, dst, dst_repo, args):
560     for component in p['components'].values():
561         migrate_jobspec(component, src, dst, dst_repo, args)
562         if 'job' in component:
563             migrate_jobspec(component['job'], src, dst, dst_repo, args)
564
565 def total_collection_size(manifest_text):
566     """Return the total number of bytes in this collection (excluding
567     duplicate blocks)."""
568
569     total_bytes = 0
570     locators_seen = {}
571     for line in manifest_text.splitlines():
572         words = line.split()
573         for word in words[1:]:
574             try:
575                 loc = arvados.KeepLocator(word)
576             except ValueError:
577                 continue  # this word isn't a locator, skip it
578             if loc.md5sum not in locators_seen:
579                 locators_seen[loc.md5sum] = True
580                 total_bytes += loc.size
581
582     return total_bytes
583
584 def create_collection_from(c, src, dst, args):
585     """Create a new collection record on dst, and copy Docker metadata if
586     available."""
587
588     collection_uuid = c['uuid']
589     del c['uuid']
590
591     if not c["name"]:
592         c['name'] = "copied from " + collection_uuid
593
594     if 'properties' in c:
595         del c['properties']
596
597     c['owner_uuid'] = args.project_uuid
598
599     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
600
601     # Create docker_image_repo+tag and docker_image_hash links
602     # at the destination.
603     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
604         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
605
606         for src_link in docker_links:
607             body = {key: src_link[key]
608                     for key in ['link_class', 'name', 'properties']}
609             body['head_uuid'] = dst_collection['uuid']
610             body['owner_uuid'] = args.project_uuid
611
612             lk = dst.links().create(body=body).execute(num_retries=args.retries)
613             logger.debug('created dst link {}'.format(lk))
614
615     return dst_collection
616
617 # copy_collection(obj_uuid, src, dst, args)
618 #
619 #    Copies the collection identified by obj_uuid from src to dst.
620 #    Returns the collection object created at dst.
621 #
622 #    If args.progress is True, produce a human-friendly progress
623 #    report.
624 #
625 #    If a collection with the desired portable_data_hash already
626 #    exists at dst, and args.force is False, copy_collection returns
627 #    the existing collection without copying any blocks.  Otherwise
628 #    (if no collection exists or if args.force is True)
629 #    copy_collection copies all of the collection data blocks from src
630 #    to dst.
631 #
632 #    For this application, it is critical to preserve the
633 #    collection's manifest hash, which is not guaranteed with the
634 #    arvados.CollectionReader and arvados.CollectionWriter classes.
635 #    Copying each block in the collection manually, followed by
636 #    the manifest block, ensures that the collection's manifest
637 #    hash will not change.
638 #
639 def copy_collection(obj_uuid, src, dst, args):
640     if arvados.util.keep_locator_pattern.match(obj_uuid):
641         # If the obj_uuid is a portable data hash, it might not be uniquely
642         # identified with a particular collection.  As a result, it is
643         # ambigious as to what name to use for the copy.  Apply some heuristics
644         # to pick which collection to get the name from.
645         srccol = src.collections().list(
646             filters=[['portable_data_hash', '=', obj_uuid]],
647             order="created_at asc"
648             ).execute(num_retries=args.retries)
649
650         items = srccol.get("items")
651
652         if not items:
653             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
654             return
655
656         c = None
657
658         if len(items) == 1:
659             # There's only one collection with the PDH, so use that.
660             c = items[0]
661         if not c:
662             # See if there is a collection that's in the same project
663             # as the root item (usually a pipeline) being copied.
664             for i in items:
665                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
666                     c = i
667                     break
668         if not c:
669             # Didn't find any collections located in the same project, so
670             # pick the oldest collection that has a name assigned to it.
671             for i in items:
672                 if i.get("name"):
673                     c = i
674                     break
675         if not c:
676             # None of the collections have names (?!), so just pick the
677             # first one.
678             c = items[0]
679
680         # list() doesn't return manifest text (and we don't want it to,
681         # because we don't need the same maninfest text sent to us 50
682         # times) so go and retrieve the collection object directly
683         # which will include the manifest text.
684         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
685     else:
686         # Assume this is an actual collection uuid, so fetch it directly.
687         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
688
689     # If a collection with this hash already exists at the
690     # destination, and 'force' is not true, just return that
691     # collection.
692     if not args.force:
693         if 'portable_data_hash' in c:
694             colhash = c['portable_data_hash']
695         else:
696             colhash = c['uuid']
697         dstcol = dst.collections().list(
698             filters=[['portable_data_hash', '=', colhash]]
699         ).execute(num_retries=args.retries)
700         if dstcol['items_available'] > 0:
701             for d in dstcol['items']:
702                 if ((args.project_uuid == d['owner_uuid']) and
703                     (c.get('name') == d['name']) and
704                     (c['portable_data_hash'] == d['portable_data_hash'])):
705                     return d
706             c['manifest_text'] = dst.collections().get(
707                 uuid=dstcol['items'][0]['uuid']
708             ).execute(num_retries=args.retries)['manifest_text']
709             return create_collection_from(c, src, dst, args)
710
711     # Fetch the collection's manifest.
712     manifest = c['manifest_text']
713     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
714
715     # Copy each block from src_keep to dst_keep.
716     # Use the newly signed locators returned from dst_keep to build
717     # a new manifest as we go.
718     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
719     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
720     dst_manifest = ""
721     dst_locators = {}
722     bytes_written = 0
723     bytes_expected = total_collection_size(manifest)
724     if args.progress:
725         progress_writer = ProgressWriter(human_progress)
726     else:
727         progress_writer = None
728
729     for line in manifest.splitlines():
730         words = line.split()
731         dst_manifest += words[0]
732         for word in words[1:]:
733             try:
734                 loc = arvados.KeepLocator(word)
735             except ValueError:
736                 # If 'word' can't be parsed as a locator,
737                 # presume it's a filename.
738                 dst_manifest += ' ' + word
739                 continue
740             blockhash = loc.md5sum
741             # copy this block if we haven't seen it before
742             # (otherwise, just reuse the existing dst_locator)
743             if blockhash not in dst_locators:
744                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
745                 if progress_writer:
746                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
747                 data = src_keep.get(word)
748                 dst_locator = dst_keep.put(data)
749                 dst_locators[blockhash] = dst_locator
750                 bytes_written += loc.size
751             dst_manifest += ' ' + dst_locators[blockhash]
752         dst_manifest += "\n"
753
754     if progress_writer:
755         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
756         progress_writer.finish()
757
758     # Copy the manifest and save the collection.
759     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
760
761     c['manifest_text'] = dst_manifest
762     return create_collection_from(c, src, dst, args)
763
764 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
765     r = api.repositories().list(
766         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
767     if r['items_available'] != 1:
768         raise Exception('cannot identify repo {}; {} repos found'
769                         .format(repo_name, r['items_available']))
770
771     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
772     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
773     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
774
775     priority = https_url + other_url + http_url
776
777     git_config = []
778     git_url = None
779     for url in priority:
780         if url.startswith("http"):
781             u = urllib.parse.urlsplit(url)
782             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
783             git_config = ["-c", "credential.%s/.username=none" % baseurl,
784                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
785         else:
786             git_config = []
787
788         try:
789             logger.debug("trying %s", url)
790             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
791                                       env={"HOME": os.environ["HOME"],
792                                            "ARVADOS_API_TOKEN": api.api_token,
793                                            "GIT_ASKPASS": "/bin/false"})
794         except arvados.errors.CommandFailedError:
795             pass
796         else:
797             git_url = url
798             break
799
800     if not git_url:
801         raise Exception('Cannot access git repository, tried {}'
802                         .format(priority))
803
804     if git_url.startswith("http:"):
805         if allow_insecure_http:
806             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
807         else:
808             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
809
810     return (git_url, git_config)
811
812
813 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
814 #
815 #    Copies commits from git repository 'src_git_repo' on Arvados
816 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
817 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
818 #    or "jsmith")
819 #
820 #    All commits will be copied to a destination branch named for the
821 #    source repository URL.
822 #
823 #    The destination repository must already exist.
824 #
825 #    The user running this command must be authenticated
826 #    to both repositories.
827 #
828 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
829     # Identify the fetch and push URLs for the git repositories.
830
831     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
832     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
833
834     logger.debug('src_git_url: {}'.format(src_git_url))
835     logger.debug('dst_git_url: {}'.format(dst_git_url))
836
837     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
838
839     # Copy git commits from src repo to dst repo.
840     if src_git_repo not in local_repo_dir:
841         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
842         arvados.util.run_command(
843             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
844              local_repo_dir[src_git_repo]],
845             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
846             env={"HOME": os.environ["HOME"],
847                  "ARVADOS_API_TOKEN": src.api_token,
848                  "GIT_ASKPASS": "/bin/false"})
849         arvados.util.run_command(
850             ["git", "remote", "add", "dst", dst_git_url],
851             cwd=local_repo_dir[src_git_repo])
852     arvados.util.run_command(
853         ["git", "branch", dst_branch, script_version],
854         cwd=local_repo_dir[src_git_repo])
855     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
856                              cwd=local_repo_dir[src_git_repo],
857                              env={"HOME": os.environ["HOME"],
858                                   "ARVADOS_API_TOKEN": dst.api_token,
859                                   "GIT_ASKPASS": "/bin/false"})
860
861 def copy_docker_images(pipeline, src, dst, args):
862     """Copy any docker images named in the pipeline components'
863     runtime_constraints field from src to dst."""
864
865     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
866     for c_name, c_info in pipeline['components'].items():
867         if ('runtime_constraints' in c_info and
868             'docker_image' in c_info['runtime_constraints']):
869             copy_docker_image(
870                 c_info['runtime_constraints']['docker_image'],
871                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
872                 src, dst, args)
873
874
875 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
876     """Copy the docker image identified by docker_image and
877     docker_image_tag from src to dst. Create appropriate
878     docker_image_repo+tag and docker_image_hash links at dst.
879
880     """
881
882     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
883
884     # Find the link identifying this docker image.
885     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
886         src, args.retries, docker_image, docker_image_tag)
887     if docker_image_list:
888         image_uuid, image_info = docker_image_list[0]
889         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
890
891         # Copy the collection it refers to.
892         dst_image_col = copy_collection(image_uuid, src, dst, args)
893     elif arvados.util.keep_locator_pattern.match(docker_image):
894         dst_image_col = copy_collection(docker_image, src, dst, args)
895     else:
896         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
897
898 # git_rev_parse(rev, repo)
899 #
900 #    Returns the 40-character commit hash corresponding to 'rev' in
901 #    git repository 'repo' (which must be the path of a local git
902 #    repository)
903 #
904 def git_rev_parse(rev, repo):
905     gitout, giterr = arvados.util.run_command(
906         ['git', 'rev-parse', rev], cwd=repo)
907     return gitout.strip()
908
909 # uuid_type(api, object_uuid)
910 #
911 #    Returns the name of the class that object_uuid belongs to, based on
912 #    the second field of the uuid.  This function consults the api's
913 #    schema to identify the object class.
914 #
915 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
916 #
917 #    Special case: if handed a Keep locator hash, return 'Collection'.
918 #
919 def uuid_type(api, object_uuid):
920     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
921         return 'Collection'
922     p = object_uuid.split('-')
923     if len(p) == 3:
924         type_prefix = p[1]
925         for k in api._schema.schemas:
926             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
927             if type_prefix == obj_class:
928                 return k
929     return None
930
931 def abort(msg, code=1):
932     logger.info("arv-copy: %s", msg)
933     exit(code)
934
935
936 # Code for reporting on the progress of a collection upload.
937 # Stolen from arvados.commands.put.ArvPutCollectionWriter
938 # TODO(twp): figure out how to refactor into a shared library
939 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
940 # code)
941
942 def machine_progress(obj_uuid, bytes_written, bytes_expected):
943     return "{} {}: {} {} written {} total\n".format(
944         sys.argv[0],
945         os.getpid(),
946         obj_uuid,
947         bytes_written,
948         -1 if (bytes_expected is None) else bytes_expected)
949
950 def human_progress(obj_uuid, bytes_written, bytes_expected):
951     if bytes_expected:
952         return "\r{}: {}M / {}M {:.1%} ".format(
953             obj_uuid,
954             bytes_written >> 20, bytes_expected >> 20,
955             float(bytes_written) / bytes_expected)
956     else:
957         return "\r{}: {} ".format(obj_uuid, bytes_written)
958
959 class ProgressWriter(object):
960     _progress_func = None
961     outfile = sys.stderr
962
963     def __init__(self, progress_func):
964         self._progress_func = progress_func
965
966     def report(self, obj_uuid, bytes_written, bytes_expected):
967         if self._progress_func is not None:
968             self.outfile.write(
969                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
970
971     def finish(self):
972         self.outfile.write("\n")
973
974 if __name__ == '__main__':
975     main()