10110: arv-copy workflows
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import contextlib
21 import getpass
22 import os
23 import re
24 import shutil
25 import sys
26 import logging
27 import tempfile
28 import urlparse
29
30 import arvados
31 import arvados.config
32 import arvados.keep
33 import arvados.util
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
36 import ruamel.yaml as yaml
37
38 from arvados.api import OrderedJsonModel
39 from arvados._version import __version__
40
41 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
42
43 logger = logging.getLogger('arvados.arv-copy')
44
45 # local_repo_dir records which git repositories from the Arvados source
46 # instance have been checked out locally during this run, and to which
47 # directories.
48 # e.g. if repository 'twp' from src_arv has been cloned into
49 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
50 #
51 local_repo_dir = {}
52
53 # List of collections that have been copied in this session, and their
54 # destination collection UUIDs.
55 collections_copied = {}
56
57 # Set of (repository, script_version) two-tuples of commits copied in git.
58 scripts_copied = set()
59
60 # The owner_uuid of the object being copied
61 src_owner_uuid = None
62
63 def main():
64     copy_opts = argparse.ArgumentParser(add_help=False)
65
66     copy_opts.add_argument(
67         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
68         help='Print version and exit.')
69     copy_opts.add_argument(
70         '-v', '--verbose', dest='verbose', action='store_true',
71         help='Verbose output.')
72     copy_opts.add_argument(
73         '--progress', dest='progress', action='store_true',
74         help='Report progress on copying collections. (default)')
75     copy_opts.add_argument(
76         '--no-progress', dest='progress', action='store_false',
77         help='Do not report progress on copying collections.')
78     copy_opts.add_argument(
79         '-f', '--force', dest='force', action='store_true',
80         help='Perform copy even if the object appears to exist at the remote destination.')
81     copy_opts.add_argument(
82         '--force-filters', action='store_true', default=False,
83         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
84     copy_opts.add_argument(
85         '--src', dest='source_arvados', required=True,
86         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
87     copy_opts.add_argument(
88         '--dst', dest='destination_arvados', required=True,
89         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
90     copy_opts.add_argument(
91         '--recursive', dest='recursive', action='store_true',
92         help='Recursively copy any dependencies for this object. (default)')
93     copy_opts.add_argument(
94         '--no-recursive', dest='recursive', action='store_false',
95         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
96     copy_opts.add_argument(
97         '--dst-git-repo', dest='dst_git_repo',
98         help='The name of the destination git repository. Required when copying a pipeline recursively.')
99     copy_opts.add_argument(
100         '--project-uuid', dest='project_uuid',
101         help='The UUID of the project at the destination to which the pipeline should be copied.')
102     copy_opts.add_argument(
103         '--allow-git-http-src', action="store_true",
104         help='Allow cloning git repositories over insecure http')
105     copy_opts.add_argument(
106         '--allow-git-http-dst', action="store_true",
107         help='Allow pushing git repositories over insecure http')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.verbose:
121         logger.setLevel(logging.DEBUG)
122     else:
123         logger.setLevel(logging.INFO)
124
125     # Create API clients for the source and destination instances
126     src_arv = api_for_instance(args.source_arvados)
127     dst_arv = api_for_instance(args.destination_arvados)
128
129     if not args.project_uuid:
130         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
131
132     # Identify the kind of object we have been given, and begin copying.
133     t = uuid_type(src_arv, args.object_uuid)
134     if t == 'Collection':
135         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
136         result = copy_collection(args.object_uuid,
137                                  src_arv, dst_arv,
138                                  args)
139     elif t == 'PipelineInstance':
140         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
141         result = copy_pipeline_instance(args.object_uuid,
142                                         src_arv, dst_arv,
143                                         args)
144     elif t == 'PipelineTemplate':
145         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
146         result = copy_pipeline_template(args.object_uuid,
147                                         src_arv, dst_arv, args)
148     elif t == 'Workflow':
149         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
150         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in local_repo_dir.values():
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     logger.info("")
165     logger.info("Success: created copy with uuid {}".format(result['uuid']))
166     exit(0)
167
168 def set_src_owner_uuid(resource, uuid, args):
169     global src_owner_uuid
170     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
171     src_owner_uuid = c.get("owner_uuid")
172
173 # api_for_instance(instance_name)
174 #
175 #     Creates an API client for the Arvados instance identified by
176 #     instance_name.
177 #
178 #     If instance_name contains a slash, it is presumed to be a path
179 #     (either local or absolute) to a file with Arvados configuration
180 #     settings.
181 #
182 #     Otherwise, it is presumed to be the name of a file in
183 #     $HOME/.config/arvados/instance_name.conf
184 #
185 def api_for_instance(instance_name):
186     if '/' in instance_name:
187         config_file = instance_name
188     else:
189         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
190
191     try:
192         cfg = arvados.config.load(config_file)
193     except (IOError, OSError) as e:
194         abort(("Could not open config file {}: {}\n" +
195                "You must make sure that your configuration tokens\n" +
196                "for Arvados instance {} are in {} and that this\n" +
197                "file is readable.").format(
198                    config_file, e, instance_name, config_file))
199
200     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
201         api_is_insecure = (
202             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
203                 ['1', 't', 'true', 'y', 'yes']))
204         client = arvados.api('v1',
205                              host=cfg['ARVADOS_API_HOST'],
206                              token=cfg['ARVADOS_API_TOKEN'],
207                              insecure=api_is_insecure,
208                              model=OrderedJsonModel())
209     else:
210         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
211     return client
212
213 # Check if git is available
214 def check_git_availability():
215     try:
216         arvados.util.run_command(['git', '--help'])
217     except Exception:
218         abort('git command is not available. Please ensure git is installed.')
219
220 # copy_pipeline_instance(pi_uuid, src, dst, args)
221 #
222 #    Copies a pipeline instance identified by pi_uuid from src to dst.
223 #
224 #    If the args.recursive option is set:
225 #      1. Copies all input collections
226 #           * For each component in the pipeline, include all collections
227 #             listed as job dependencies for that component)
228 #      2. Copy docker images
229 #      3. Copy git repositories
230 #      4. Copy the pipeline template
231 #
232 #    The only changes made to the copied pipeline instance are:
233 #      1. The original pipeline instance UUID is preserved in
234 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
235 #      2. The pipeline_template_uuid is changed to the new template uuid.
236 #      3. The owner_uuid of the instance is changed to the user who
237 #         copied it.
238 #
239 def copy_pipeline_instance(pi_uuid, src, dst, args):
240     # Fetch the pipeline instance record.
241     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
242
243     if args.recursive:
244         check_git_availability()
245
246         if not args.dst_git_repo:
247             abort('--dst-git-repo is required when copying a pipeline recursively.')
248         # Copy the pipeline template and save the copied template.
249         if pi.get('pipeline_template_uuid', None):
250             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
251                                         src, dst, args)
252
253         # Copy input collections, docker images and git repos.
254         pi = copy_collections(pi, src, dst, args)
255         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
256         copy_docker_images(pi, src, dst, args)
257
258         # Update the fields of the pipeline instance with the copied
259         # pipeline template.
260         if pi.get('pipeline_template_uuid', None):
261             pi['pipeline_template_uuid'] = pt['uuid']
262
263     else:
264         # not recursive
265         logger.info("Copying only pipeline instance %s.", pi_uuid)
266         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
267
268     # Update the pipeline instance properties, and create the new
269     # instance at dst.
270     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
271     pi['description'] = "Pipeline copied from {}\n\n{}".format(
272         pi_uuid,
273         pi['description'] if pi.get('description', None) else '')
274
275     pi['owner_uuid'] = args.project_uuid
276
277     del pi['uuid']
278
279     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
280     return new_pi
281
282 def filter_iter(arg):
283     """Iterate a filter string-or-list.
284
285     Pass in a filter field that can either be a string or list.
286     This will iterate elements as if the field had been written as a list.
287     """
288     if isinstance(arg, basestring):
289         return iter((arg,))
290     else:
291         return iter(arg)
292
293 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
294     """Update a single repository filter in-place for the destination.
295
296     If the filter checks that the repository is src_repository, it is
297     updated to check that the repository is dst_repository.  If it does
298     anything else, this function raises ValueError.
299     """
300     if src_repository is None:
301         raise ValueError("component does not specify a source repository")
302     elif dst_repository is None:
303         raise ValueError("no destination repository specified to update repository filter")
304     elif repo_filter[1:] == ['=', src_repository]:
305         repo_filter[2] = dst_repository
306     elif repo_filter[1:] == ['in', [src_repository]]:
307         repo_filter[2] = [dst_repository]
308     else:
309         raise ValueError("repository filter is not a simple source match")
310
311 def migrate_script_version_filter(version_filter):
312     """Update a single script_version filter in-place for the destination.
313
314     Currently this function checks that all the filter operands are Git
315     commit hashes.  If they're not, it raises ValueError to indicate that
316     the filter is not portable.  It could be extended to make other
317     transformations in the future.
318     """
319     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
320         raise ValueError("script_version filter is not limited to commit hashes")
321
322 def attr_filtered(filter_, *attr_names):
323     """Return True if filter_ applies to any of attr_names, else False."""
324     return any((name == 'any') or (name in attr_names)
325                for name in filter_iter(filter_[0]))
326
327 @contextlib.contextmanager
328 def exception_handler(handler, *exc_types):
329     """If any exc_types are raised in the block, call handler on the exception."""
330     try:
331         yield
332     except exc_types as error:
333         handler(error)
334
335 def migrate_components_filters(template_components, dst_git_repo):
336     """Update template component filters in-place for the destination.
337
338     template_components is a dictionary of components in a pipeline template.
339     This method walks over each component's filters, and updates them to have
340     identical semantics on the destination cluster.  It returns a list of
341     error strings that describe what filters could not be updated safely.
342
343     dst_git_repo is the name of the destination Git repository, which can
344     be None if that is not known.
345     """
346     errors = []
347     for cname, cspec in template_components.iteritems():
348         def add_error(errmsg):
349             errors.append("{}: {}".format(cname, errmsg))
350         if not isinstance(cspec, dict):
351             add_error("value is not a component definition")
352             continue
353         src_repository = cspec.get('repository')
354         filters = cspec.get('filters', [])
355         if not isinstance(filters, list):
356             add_error("filters are not a list")
357             continue
358         for cfilter in filters:
359             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
360                 add_error("malformed filter {!r}".format(cfilter))
361                 continue
362             if attr_filtered(cfilter, 'repository'):
363                 with exception_handler(add_error, ValueError):
364                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
365             if attr_filtered(cfilter, 'script_version'):
366                 with exception_handler(add_error, ValueError):
367                     migrate_script_version_filter(cfilter)
368     return errors
369
370 # copy_pipeline_template(pt_uuid, src, dst, args)
371 #
372 #    Copies a pipeline template identified by pt_uuid from src to dst.
373 #
374 #    If args.recursive is True, also copy any collections, docker
375 #    images and git repositories that this template references.
376 #
377 #    The owner_uuid of the new template is changed to that of the user
378 #    who copied the template.
379 #
380 #    Returns the copied pipeline template object.
381 #
382 def copy_pipeline_template(pt_uuid, src, dst, args):
383     # fetch the pipeline template from the source instance
384     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
385
386     if not args.force_filters:
387         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
388         if filter_errors:
389             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
390                   "\n".join(filter_errors))
391
392     if args.recursive:
393         check_git_availability()
394
395         if not args.dst_git_repo:
396             abort('--dst-git-repo is required when copying a pipeline recursively.')
397         # Copy input collections, docker images and git repos.
398         pt = copy_collections(pt, src, dst, args)
399         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
400         copy_docker_images(pt, src, dst, args)
401
402     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
403         pt_uuid,
404         pt['description'] if pt.get('description', None) else '')
405     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
406     del pt['uuid']
407
408     pt['owner_uuid'] = args.project_uuid
409
410     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
411
412 # copy_workflow(wf_uuid, src, dst, args)
413 #
414 #    Copies a workflow identified by wf_uuid from src to dst.
415 #
416 #    If args.recursive is True, also copy any collections
417 #      referenced in the workflow definition yaml.
418 #
419 #    The owner_uuid of the new workflow is set to any given
420 #      project_uuid or the user who copied the template.
421 #
422 #    Returns the copied workflow object.
423 #
424 def copy_workflow(wf_uuid, src, dst, args):
425     # fetch the workflow from the source instance
426     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
427
428     if args.recursive:
429         wf_def = yaml.safe_load(wf["definition"])
430         if wf_def is not None:
431             colls = []
432             graph = wf_def.get('$graph', None)
433             if graph is not None:
434                 workflow_collections(graph, colls)
435             else:
436                 workflow_collections(wf_def, colls)
437             copy_collections(colls, src, dst, args)
438
439     del wf['uuid']
440     wf['owner_uuid'] = args.project_uuid
441
442     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
443
444 def workflow_collections(obj, colls):
445     if isinstance(obj, dict):
446         loc = obj.get('location', None)
447         if loc is not None:
448             if loc.startswith("keep:"):
449                 colls.append(loc[5:])
450         for x in obj:
451             workflow_collections(obj[x], colls)
452     if isinstance(obj, list):
453         for x in obj:
454             workflow_collections(x, colls)
455
456 # copy_collections(obj, src, dst, args)
457 #
458 #    Recursively copies all collections referenced by 'obj' from src
459 #    to dst.  obj may be a dict or a list, in which case we run
460 #    copy_collections on every value it contains. If it is a string,
461 #    search it for any substring that matches a collection hash or uuid
462 #    (this will find hidden references to collections like
463 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
464 #
465 #    Returns a copy of obj with any old collection uuids replaced by
466 #    the new ones.
467 #
468 def copy_collections(obj, src, dst, args):
469
470     def copy_collection_fn(collection_match):
471         """Helper function for regex substitution: copies a single collection,
472         identified by the collection_match MatchObject, to the
473         destination.  Returns the destination collection uuid (or the
474         portable data hash if that's what src_id is).
475
476         """
477         src_id = collection_match.group(0)
478         if src_id not in collections_copied:
479             dst_col = copy_collection(src_id, src, dst, args)
480             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
481                 collections_copied[src_id] = src_id
482             else:
483                 collections_copied[src_id] = dst_col['uuid']
484         return collections_copied[src_id]
485
486     if isinstance(obj, basestring):
487         # Copy any collections identified in this string to dst, replacing
488         # them with the dst uuids as necessary.
489         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
490         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
491         return obj
492     elif isinstance(obj, dict):
493         return type(obj)((v, copy_collections(obj[v], src, dst, args))
494                          for v in obj)
495     elif isinstance(obj, list):
496         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
497     return obj
498
499 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
500     """Copy a job's script to the destination repository, and update its record.
501
502     Given a jobspec dictionary, this function finds the referenced script from
503     src and copies it to dst and dst_repo.  It also updates jobspec in place to
504     refer to names on the destination.
505     """
506     repo = jobspec.get('repository')
507     if repo is None:
508         return
509     # script_version is the "script_version" parameter from the source
510     # component or job.  If no script_version was supplied in the
511     # component or job, it is a mistake in the pipeline, but for the
512     # purposes of copying the repository, default to "master".
513     script_version = jobspec.get('script_version') or 'master'
514     script_key = (repo, script_version)
515     if script_key not in scripts_copied:
516         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
517         scripts_copied.add(script_key)
518     jobspec['repository'] = dst_repo
519     repo_dir = local_repo_dir[repo]
520     for version_key in ['script_version', 'supplied_script_version']:
521         if version_key in jobspec:
522             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
523
524 # copy_git_repos(p, src, dst, dst_repo, args)
525 #
526 #    Copies all git repositories referenced by pipeline instance or
527 #    template 'p' from src to dst.
528 #
529 #    For each component c in the pipeline:
530 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
531 #      * Rename script versions:
532 #          * c['script_version']
533 #          * c['job']['script_version']
534 #          * c['job']['supplied_script_version']
535 #        to the commit hashes they resolve to, since any symbolic
536 #        names (tags, branches) are not preserved in the destination repo.
537 #
538 #    The pipeline object is updated in place with the new repository
539 #    names.  The return value is undefined.
540 #
541 def copy_git_repos(p, src, dst, dst_repo, args):
542     for component in p['components'].itervalues():
543         migrate_jobspec(component, src, dst, dst_repo, args)
544         if 'job' in component:
545             migrate_jobspec(component['job'], src, dst, dst_repo, args)
546
547 def total_collection_size(manifest_text):
548     """Return the total number of bytes in this collection (excluding
549     duplicate blocks)."""
550
551     total_bytes = 0
552     locators_seen = {}
553     for line in manifest_text.splitlines():
554         words = line.split()
555         for word in words[1:]:
556             try:
557                 loc = arvados.KeepLocator(word)
558             except ValueError:
559                 continue  # this word isn't a locator, skip it
560             if loc.md5sum not in locators_seen:
561                 locators_seen[loc.md5sum] = True
562                 total_bytes += loc.size
563
564     return total_bytes
565
566 def create_collection_from(c, src, dst, args):
567     """Create a new collection record on dst, and copy Docker metadata if
568     available."""
569
570     collection_uuid = c['uuid']
571     del c['uuid']
572
573     if not c["name"]:
574         c['name'] = "copied from " + collection_uuid
575
576     if 'properties' in c:
577         del c['properties']
578
579     c['owner_uuid'] = args.project_uuid
580
581     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
582
583     # Create docker_image_repo+tag and docker_image_hash links
584     # at the destination.
585     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
586         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
587
588         for src_link in docker_links:
589             body = {key: src_link[key]
590                     for key in ['link_class', 'name', 'properties']}
591             body['head_uuid'] = dst_collection['uuid']
592             body['owner_uuid'] = args.project_uuid
593
594             lk = dst.links().create(body=body).execute(num_retries=args.retries)
595             logger.debug('created dst link {}'.format(lk))
596
597     return dst_collection
598
599 # copy_collection(obj_uuid, src, dst, args)
600 #
601 #    Copies the collection identified by obj_uuid from src to dst.
602 #    Returns the collection object created at dst.
603 #
604 #    If args.progress is True, produce a human-friendly progress
605 #    report.
606 #
607 #    If a collection with the desired portable_data_hash already
608 #    exists at dst, and args.force is False, copy_collection returns
609 #    the existing collection without copying any blocks.  Otherwise
610 #    (if no collection exists or if args.force is True)
611 #    copy_collection copies all of the collection data blocks from src
612 #    to dst.
613 #
614 #    For this application, it is critical to preserve the
615 #    collection's manifest hash, which is not guaranteed with the
616 #    arvados.CollectionReader and arvados.CollectionWriter classes.
617 #    Copying each block in the collection manually, followed by
618 #    the manifest block, ensures that the collection's manifest
619 #    hash will not change.
620 #
621 def copy_collection(obj_uuid, src, dst, args):
622     if arvados.util.keep_locator_pattern.match(obj_uuid):
623         # If the obj_uuid is a portable data hash, it might not be uniquely
624         # identified with a particular collection.  As a result, it is
625         # ambigious as to what name to use for the copy.  Apply some heuristics
626         # to pick which collection to get the name from.
627         srccol = src.collections().list(
628             filters=[['portable_data_hash', '=', obj_uuid]],
629             order="created_at asc"
630             ).execute(num_retries=args.retries)
631
632         items = srccol.get("items")
633
634         if not items:
635             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
636             return
637
638         c = None
639
640         if len(items) == 1:
641             # There's only one collection with the PDH, so use that.
642             c = items[0]
643         if not c:
644             # See if there is a collection that's in the same project
645             # as the root item (usually a pipeline) being copied.
646             for i in items:
647                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
648                     c = i
649                     break
650         if not c:
651             # Didn't find any collections located in the same project, so
652             # pick the oldest collection that has a name assigned to it.
653             for i in items:
654                 if i.get("name"):
655                     c = i
656                     break
657         if not c:
658             # None of the collections have names (?!), so just pick the
659             # first one.
660             c = items[0]
661
662         # list() doesn't return manifest text (and we don't want it to,
663         # because we don't need the same maninfest text sent to us 50
664         # times) so go and retrieve the collection object directly
665         # which will include the manifest text.
666         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
667     else:
668         # Assume this is an actual collection uuid, so fetch it directly.
669         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
670
671     # If a collection with this hash already exists at the
672     # destination, and 'force' is not true, just return that
673     # collection.
674     if not args.force:
675         if 'portable_data_hash' in c:
676             colhash = c['portable_data_hash']
677         else:
678             colhash = c['uuid']
679         dstcol = dst.collections().list(
680             filters=[['portable_data_hash', '=', colhash]]
681         ).execute(num_retries=args.retries)
682         if dstcol['items_available'] > 0:
683             for d in dstcol['items']:
684                 if ((args.project_uuid == d['owner_uuid']) and
685                     (c.get('name') == d['name']) and
686                     (c['portable_data_hash'] == d['portable_data_hash'])):
687                     return d
688             c['manifest_text'] = dst.collections().get(
689                 uuid=dstcol['items'][0]['uuid']
690             ).execute(num_retries=args.retries)['manifest_text']
691             return create_collection_from(c, src, dst, args)
692
693     # Fetch the collection's manifest.
694     manifest = c['manifest_text']
695     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
696
697     # Copy each block from src_keep to dst_keep.
698     # Use the newly signed locators returned from dst_keep to build
699     # a new manifest as we go.
700     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
701     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
702     dst_manifest = ""
703     dst_locators = {}
704     bytes_written = 0
705     bytes_expected = total_collection_size(manifest)
706     if args.progress:
707         progress_writer = ProgressWriter(human_progress)
708     else:
709         progress_writer = None
710
711     for line in manifest.splitlines():
712         words = line.split()
713         dst_manifest += words[0]
714         for word in words[1:]:
715             try:
716                 loc = arvados.KeepLocator(word)
717             except ValueError:
718                 # If 'word' can't be parsed as a locator,
719                 # presume it's a filename.
720                 dst_manifest += ' ' + word
721                 continue
722             blockhash = loc.md5sum
723             # copy this block if we haven't seen it before
724             # (otherwise, just reuse the existing dst_locator)
725             if blockhash not in dst_locators:
726                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
727                 if progress_writer:
728                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
729                 data = src_keep.get(word)
730                 dst_locator = dst_keep.put(data)
731                 dst_locators[blockhash] = dst_locator
732                 bytes_written += loc.size
733             dst_manifest += ' ' + dst_locators[blockhash]
734         dst_manifest += "\n"
735
736     if progress_writer:
737         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
738         progress_writer.finish()
739
740     # Copy the manifest and save the collection.
741     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
742
743     c['manifest_text'] = dst_manifest
744     return create_collection_from(c, src, dst, args)
745
746 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
747     r = api.repositories().list(
748         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
749     if r['items_available'] != 1:
750         raise Exception('cannot identify repo {}; {} repos found'
751                         .format(repo_name, r['items_available']))
752
753     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
754     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
755     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
756
757     priority = https_url + other_url + http_url
758
759     git_config = []
760     git_url = None
761     for url in priority:
762         if url.startswith("http"):
763             u = urlparse.urlsplit(url)
764             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
765             git_config = ["-c", "credential.%s/.username=none" % baseurl,
766                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
767         else:
768             git_config = []
769
770         try:
771             logger.debug("trying %s", url)
772             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
773                                       env={"HOME": os.environ["HOME"],
774                                            "ARVADOS_API_TOKEN": api.api_token,
775                                            "GIT_ASKPASS": "/bin/false"})
776         except arvados.errors.CommandFailedError:
777             pass
778         else:
779             git_url = url
780             break
781
782     if not git_url:
783         raise Exception('Cannot access git repository, tried {}'
784                         .format(priority))
785
786     if git_url.startswith("http:"):
787         if allow_insecure_http:
788             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
789         else:
790             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
791
792     return (git_url, git_config)
793
794
795 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
796 #
797 #    Copies commits from git repository 'src_git_repo' on Arvados
798 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
799 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
800 #    or "jsmith")
801 #
802 #    All commits will be copied to a destination branch named for the
803 #    source repository URL.
804 #
805 #    The destination repository must already exist.
806 #
807 #    The user running this command must be authenticated
808 #    to both repositories.
809 #
810 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
811     # Identify the fetch and push URLs for the git repositories.
812
813     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
814     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
815
816     logger.debug('src_git_url: {}'.format(src_git_url))
817     logger.debug('dst_git_url: {}'.format(dst_git_url))
818
819     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
820
821     # Copy git commits from src repo to dst repo.
822     if src_git_repo not in local_repo_dir:
823         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
824         arvados.util.run_command(
825             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
826              local_repo_dir[src_git_repo]],
827             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
828             env={"HOME": os.environ["HOME"],
829                  "ARVADOS_API_TOKEN": src.api_token,
830                  "GIT_ASKPASS": "/bin/false"})
831         arvados.util.run_command(
832             ["git", "remote", "add", "dst", dst_git_url],
833             cwd=local_repo_dir[src_git_repo])
834     arvados.util.run_command(
835         ["git", "branch", dst_branch, script_version],
836         cwd=local_repo_dir[src_git_repo])
837     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
838                              cwd=local_repo_dir[src_git_repo],
839                              env={"HOME": os.environ["HOME"],
840                                   "ARVADOS_API_TOKEN": dst.api_token,
841                                   "GIT_ASKPASS": "/bin/false"})
842
843 def copy_docker_images(pipeline, src, dst, args):
844     """Copy any docker images named in the pipeline components'
845     runtime_constraints field from src to dst."""
846
847     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
848     for c_name, c_info in pipeline['components'].iteritems():
849         if ('runtime_constraints' in c_info and
850             'docker_image' in c_info['runtime_constraints']):
851             copy_docker_image(
852                 c_info['runtime_constraints']['docker_image'],
853                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
854                 src, dst, args)
855
856
857 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
858     """Copy the docker image identified by docker_image and
859     docker_image_tag from src to dst. Create appropriate
860     docker_image_repo+tag and docker_image_hash links at dst.
861
862     """
863
864     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
865
866     # Find the link identifying this docker image.
867     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
868         src, args.retries, docker_image, docker_image_tag)
869     if docker_image_list:
870         image_uuid, image_info = docker_image_list[0]
871         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
872
873         # Copy the collection it refers to.
874         dst_image_col = copy_collection(image_uuid, src, dst, args)
875     elif arvados.util.keep_locator_pattern.match(docker_image):
876         dst_image_col = copy_collection(docker_image, src, dst, args)
877     else:
878         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
879
880 # git_rev_parse(rev, repo)
881 #
882 #    Returns the 40-character commit hash corresponding to 'rev' in
883 #    git repository 'repo' (which must be the path of a local git
884 #    repository)
885 #
886 def git_rev_parse(rev, repo):
887     gitout, giterr = arvados.util.run_command(
888         ['git', 'rev-parse', rev], cwd=repo)
889     return gitout.strip()
890
891 # uuid_type(api, object_uuid)
892 #
893 #    Returns the name of the class that object_uuid belongs to, based on
894 #    the second field of the uuid.  This function consults the api's
895 #    schema to identify the object class.
896 #
897 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
898 #
899 #    Special case: if handed a Keep locator hash, return 'Collection'.
900 #
901 def uuid_type(api, object_uuid):
902     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
903         return 'Collection'
904     p = object_uuid.split('-')
905     if len(p) == 3:
906         type_prefix = p[1]
907         for k in api._schema.schemas:
908             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
909             if type_prefix == obj_class:
910                 return k
911     return None
912
913 def abort(msg, code=1):
914     logger.info("arv-copy: %s", msg)
915     exit(code)
916
917
918 # Code for reporting on the progress of a collection upload.
919 # Stolen from arvados.commands.put.ArvPutCollectionWriter
920 # TODO(twp): figure out how to refactor into a shared library
921 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
922 # code)
923
924 def machine_progress(obj_uuid, bytes_written, bytes_expected):
925     return "{} {}: {} {} written {} total\n".format(
926         sys.argv[0],
927         os.getpid(),
928         obj_uuid,
929         bytes_written,
930         -1 if (bytes_expected is None) else bytes_expected)
931
932 def human_progress(obj_uuid, bytes_written, bytes_expected):
933     if bytes_expected:
934         return "\r{}: {}M / {}M {:.1%} ".format(
935             obj_uuid,
936             bytes_written >> 20, bytes_expected >> 20,
937             float(bytes_written) / bytes_expected)
938     else:
939         return "\r{}: {} ".format(obj_uuid, bytes_written)
940
941 class ProgressWriter(object):
942     _progress_func = None
943     outfile = sys.stderr
944
945     def __init__(self, progress_func):
946         self._progress_func = progress_func
947
948     def report(self, obj_uuid, bytes_written, bytes_expected):
949         if self._progress_func is not None:
950             self.outfile.write(
951                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
952
953     def finish(self):
954         self.outfile.write("\n")
955
956 if __name__ == '__main__':
957     main()