Merge branch 'master' into wtsi-hgi-8087-arv-cli-request-body-from-file
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import contextlib
21 import getpass
22 import os
23 import re
24 import shutil
25 import sys
26 import logging
27 import tempfile
28 import urlparse
29
30 import arvados
31 import arvados.config
32 import arvados.keep
33 import arvados.util
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
36
37 from arvados.api import OrderedJsonModel
38
39 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
40
41 logger = logging.getLogger('arvados.arv-copy')
42
43 # local_repo_dir records which git repositories from the Arvados source
44 # instance have been checked out locally during this run, and to which
45 # directories.
46 # e.g. if repository 'twp' from src_arv has been cloned into
47 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
48 #
49 local_repo_dir = {}
50
51 # List of collections that have been copied in this session, and their
52 # destination collection UUIDs.
53 collections_copied = {}
54
55 # Set of (repository, script_version) two-tuples of commits copied in git.
56 scripts_copied = set()
57
58 # The owner_uuid of the object being copied
59 src_owner_uuid = None
60
61 def main():
62     copy_opts = argparse.ArgumentParser(add_help=False)
63
64     copy_opts.add_argument(
65         '-v', '--verbose', dest='verbose', action='store_true',
66         help='Verbose output.')
67     copy_opts.add_argument(
68         '--progress', dest='progress', action='store_true',
69         help='Report progress on copying collections. (default)')
70     copy_opts.add_argument(
71         '--no-progress', dest='progress', action='store_false',
72         help='Do not report progress on copying collections.')
73     copy_opts.add_argument(
74         '-f', '--force', dest='force', action='store_true',
75         help='Perform copy even if the object appears to exist at the remote destination.')
76     copy_opts.add_argument(
77         '--force-filters', action='store_true', default=False,
78         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
79     copy_opts.add_argument(
80         '--src', dest='source_arvados', required=True,
81         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
82     copy_opts.add_argument(
83         '--dst', dest='destination_arvados', required=True,
84         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
85     copy_opts.add_argument(
86         '--recursive', dest='recursive', action='store_true',
87         help='Recursively copy any dependencies for this object. (default)')
88     copy_opts.add_argument(
89         '--no-recursive', dest='recursive', action='store_false',
90         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
91     copy_opts.add_argument(
92         '--dst-git-repo', dest='dst_git_repo',
93         help='The name of the destination git repository. Required when copying a pipeline recursively.')
94     copy_opts.add_argument(
95         '--project-uuid', dest='project_uuid',
96         help='The UUID of the project at the destination to which the pipeline should be copied.')
97     copy_opts.add_argument(
98         '--allow-git-http-src', action="store_true",
99         help='Allow cloning git repositories over insecure http')
100     copy_opts.add_argument(
101         '--allow-git-http-dst', action="store_true",
102         help='Allow pushing git repositories over insecure http')
103
104     copy_opts.add_argument(
105         'object_uuid',
106         help='The UUID of the object to be copied.')
107     copy_opts.set_defaults(progress=True)
108     copy_opts.set_defaults(recursive=True)
109
110     parser = argparse.ArgumentParser(
111         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
112         parents=[copy_opts, arv_cmd.retry_opt])
113     args = parser.parse_args()
114
115     if args.verbose:
116         logger.setLevel(logging.DEBUG)
117     else:
118         logger.setLevel(logging.INFO)
119
120     # Create API clients for the source and destination instances
121     src_arv = api_for_instance(args.source_arvados)
122     dst_arv = api_for_instance(args.destination_arvados)
123
124     if not args.project_uuid:
125         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
126
127     # Identify the kind of object we have been given, and begin copying.
128     t = uuid_type(src_arv, args.object_uuid)
129     if t == 'Collection':
130         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
131         result = copy_collection(args.object_uuid,
132                                  src_arv, dst_arv,
133                                  args)
134     elif t == 'PipelineInstance':
135         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
136         result = copy_pipeline_instance(args.object_uuid,
137                                         src_arv, dst_arv,
138                                         args)
139     elif t == 'PipelineTemplate':
140         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
141         result = copy_pipeline_template(args.object_uuid,
142                                         src_arv, dst_arv, args)
143     else:
144         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
145
146     # Clean up any outstanding temp git repositories.
147     for d in local_repo_dir.values():
148         shutil.rmtree(d, ignore_errors=True)
149
150     # If no exception was thrown and the response does not have an
151     # error_token field, presume success
152     if 'error_token' in result or 'uuid' not in result:
153         logger.error("API server returned an error result: {}".format(result))
154         exit(1)
155
156     logger.info("")
157     logger.info("Success: created copy with uuid {}".format(result['uuid']))
158     exit(0)
159
160 def set_src_owner_uuid(resource, uuid, args):
161     global src_owner_uuid
162     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
163     src_owner_uuid = c.get("owner_uuid")
164
165 # api_for_instance(instance_name)
166 #
167 #     Creates an API client for the Arvados instance identified by
168 #     instance_name.
169 #
170 #     If instance_name contains a slash, it is presumed to be a path
171 #     (either local or absolute) to a file with Arvados configuration
172 #     settings.
173 #
174 #     Otherwise, it is presumed to be the name of a file in
175 #     $HOME/.config/arvados/instance_name.conf
176 #
177 def api_for_instance(instance_name):
178     if '/' in instance_name:
179         config_file = instance_name
180     else:
181         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
182
183     try:
184         cfg = arvados.config.load(config_file)
185     except (IOError, OSError) as e:
186         abort(("Could not open config file {}: {}\n" +
187                "You must make sure that your configuration tokens\n" +
188                "for Arvados instance {} are in {} and that this\n" +
189                "file is readable.").format(
190                    config_file, e, instance_name, config_file))
191
192     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
193         api_is_insecure = (
194             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
195                 ['1', 't', 'true', 'y', 'yes']))
196         client = arvados.api('v1',
197                              host=cfg['ARVADOS_API_HOST'],
198                              token=cfg['ARVADOS_API_TOKEN'],
199                              insecure=api_is_insecure,
200                              model=OrderedJsonModel())
201     else:
202         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
203     return client
204
205 # Check if git is available
206 def check_git_availability():
207     try:
208         arvados.util.run_command(['git', '--help'])
209     except Exception:
210         abort('git command is not available. Please ensure git is installed.')
211
212 # copy_pipeline_instance(pi_uuid, src, dst, args)
213 #
214 #    Copies a pipeline instance identified by pi_uuid from src to dst.
215 #
216 #    If the args.recursive option is set:
217 #      1. Copies all input collections
218 #           * For each component in the pipeline, include all collections
219 #             listed as job dependencies for that component)
220 #      2. Copy docker images
221 #      3. Copy git repositories
222 #      4. Copy the pipeline template
223 #
224 #    The only changes made to the copied pipeline instance are:
225 #      1. The original pipeline instance UUID is preserved in
226 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
227 #      2. The pipeline_template_uuid is changed to the new template uuid.
228 #      3. The owner_uuid of the instance is changed to the user who
229 #         copied it.
230 #
231 def copy_pipeline_instance(pi_uuid, src, dst, args):
232     # Fetch the pipeline instance record.
233     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
234
235     if args.recursive:
236         check_git_availability()
237
238         if not args.dst_git_repo:
239             abort('--dst-git-repo is required when copying a pipeline recursively.')
240         # Copy the pipeline template and save the copied template.
241         if pi.get('pipeline_template_uuid', None):
242             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
243                                         src, dst, args)
244
245         # Copy input collections, docker images and git repos.
246         pi = copy_collections(pi, src, dst, args)
247         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
248         copy_docker_images(pi, src, dst, args)
249
250         # Update the fields of the pipeline instance with the copied
251         # pipeline template.
252         if pi.get('pipeline_template_uuid', None):
253             pi['pipeline_template_uuid'] = pt['uuid']
254
255     else:
256         # not recursive
257         logger.info("Copying only pipeline instance %s.", pi_uuid)
258         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
259
260     # Update the pipeline instance properties, and create the new
261     # instance at dst.
262     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
263     pi['description'] = "Pipeline copied from {}\n\n{}".format(
264         pi_uuid,
265         pi['description'] if pi.get('description', None) else '')
266
267     pi['owner_uuid'] = args.project_uuid
268
269     del pi['uuid']
270
271     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
272     return new_pi
273
274 def filter_iter(arg):
275     """Iterate a filter string-or-list.
276
277     Pass in a filter field that can either be a string or list.
278     This will iterate elements as if the field had been written as a list.
279     """
280     if isinstance(arg, basestring):
281         return iter((arg,))
282     else:
283         return iter(arg)
284
285 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
286     """Update a single repository filter in-place for the destination.
287
288     If the filter checks that the repository is src_repository, it is
289     updated to check that the repository is dst_repository.  If it does
290     anything else, this function raises ValueError.
291     """
292     if src_repository is None:
293         raise ValueError("component does not specify a source repository")
294     elif dst_repository is None:
295         raise ValueError("no destination repository specified to update repository filter")
296     elif repo_filter[1:] == ['=', src_repository]:
297         repo_filter[2] = dst_repository
298     elif repo_filter[1:] == ['in', [src_repository]]:
299         repo_filter[2] = [dst_repository]
300     else:
301         raise ValueError("repository filter is not a simple source match")
302
303 def migrate_script_version_filter(version_filter):
304     """Update a single script_version filter in-place for the destination.
305
306     Currently this function checks that all the filter operands are Git
307     commit hashes.  If they're not, it raises ValueError to indicate that
308     the filter is not portable.  It could be extended to make other
309     transformations in the future.
310     """
311     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
312         raise ValueError("script_version filter is not limited to commit hashes")
313
314 def attr_filtered(filter_, *attr_names):
315     """Return True if filter_ applies to any of attr_names, else False."""
316     return any((name == 'any') or (name in attr_names)
317                for name in filter_iter(filter_[0]))
318
319 @contextlib.contextmanager
320 def exception_handler(handler, *exc_types):
321     """If any exc_types are raised in the block, call handler on the exception."""
322     try:
323         yield
324     except exc_types as error:
325         handler(error)
326
327 def migrate_components_filters(template_components, dst_git_repo):
328     """Update template component filters in-place for the destination.
329
330     template_components is a dictionary of components in a pipeline template.
331     This method walks over each component's filters, and updates them to have
332     identical semantics on the destination cluster.  It returns a list of
333     error strings that describe what filters could not be updated safely.
334
335     dst_git_repo is the name of the destination Git repository, which can
336     be None if that is not known.
337     """
338     errors = []
339     for cname, cspec in template_components.iteritems():
340         def add_error(errmsg):
341             errors.append("{}: {}".format(cname, errmsg))
342         if not isinstance(cspec, dict):
343             add_error("value is not a component definition")
344             continue
345         src_repository = cspec.get('repository')
346         filters = cspec.get('filters', [])
347         if not isinstance(filters, list):
348             add_error("filters are not a list")
349             continue
350         for cfilter in filters:
351             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
352                 add_error("malformed filter {!r}".format(cfilter))
353                 continue
354             if attr_filtered(cfilter, 'repository'):
355                 with exception_handler(add_error, ValueError):
356                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
357             if attr_filtered(cfilter, 'script_version'):
358                 with exception_handler(add_error, ValueError):
359                     migrate_script_version_filter(cfilter)
360     return errors
361
362 # copy_pipeline_template(pt_uuid, src, dst, args)
363 #
364 #    Copies a pipeline template identified by pt_uuid from src to dst.
365 #
366 #    If args.recursive is True, also copy any collections, docker
367 #    images and git repositories that this template references.
368 #
369 #    The owner_uuid of the new template is changed to that of the user
370 #    who copied the template.
371 #
372 #    Returns the copied pipeline template object.
373 #
374 def copy_pipeline_template(pt_uuid, src, dst, args):
375     # fetch the pipeline template from the source instance
376     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
377
378     if not args.force_filters:
379         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
380         if filter_errors:
381             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
382                   "\n".join(filter_errors))
383
384     if args.recursive:
385         check_git_availability()
386
387         if not args.dst_git_repo:
388             abort('--dst-git-repo is required when copying a pipeline recursively.')
389         # Copy input collections, docker images and git repos.
390         pt = copy_collections(pt, src, dst, args)
391         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
392         copy_docker_images(pt, src, dst, args)
393
394     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
395         pt_uuid,
396         pt['description'] if pt.get('description', None) else '')
397     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
398     del pt['uuid']
399
400     pt['owner_uuid'] = args.project_uuid
401
402     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
403
404 # copy_collections(obj, src, dst, args)
405 #
406 #    Recursively copies all collections referenced by 'obj' from src
407 #    to dst.  obj may be a dict or a list, in which case we run
408 #    copy_collections on every value it contains. If it is a string,
409 #    search it for any substring that matches a collection hash or uuid
410 #    (this will find hidden references to collections like
411 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
412 #
413 #    Returns a copy of obj with any old collection uuids replaced by
414 #    the new ones.
415 #
416 def copy_collections(obj, src, dst, args):
417
418     def copy_collection_fn(collection_match):
419         """Helper function for regex substitution: copies a single collection,
420         identified by the collection_match MatchObject, to the
421         destination.  Returns the destination collection uuid (or the
422         portable data hash if that's what src_id is).
423
424         """
425         src_id = collection_match.group(0)
426         if src_id not in collections_copied:
427             dst_col = copy_collection(src_id, src, dst, args)
428             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
429                 collections_copied[src_id] = src_id
430             else:
431                 collections_copied[src_id] = dst_col['uuid']
432         return collections_copied[src_id]
433
434     if isinstance(obj, basestring):
435         # Copy any collections identified in this string to dst, replacing
436         # them with the dst uuids as necessary.
437         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
438         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
439         return obj
440     elif isinstance(obj, dict):
441         return type(obj)((v, copy_collections(obj[v], src, dst, args))
442                          for v in obj)
443     elif isinstance(obj, list):
444         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
445     return obj
446
447 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
448     """Copy a job's script to the destination repository, and update its record.
449
450     Given a jobspec dictionary, this function finds the referenced script from
451     src and copies it to dst and dst_repo.  It also updates jobspec in place to
452     refer to names on the destination.
453     """
454     repo = jobspec.get('repository')
455     if repo is None:
456         return
457     # script_version is the "script_version" parameter from the source
458     # component or job.  If no script_version was supplied in the
459     # component or job, it is a mistake in the pipeline, but for the
460     # purposes of copying the repository, default to "master".
461     script_version = jobspec.get('script_version') or 'master'
462     script_key = (repo, script_version)
463     if script_key not in scripts_copied:
464         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
465         scripts_copied.add(script_key)
466     jobspec['repository'] = dst_repo
467     repo_dir = local_repo_dir[repo]
468     for version_key in ['script_version', 'supplied_script_version']:
469         if version_key in jobspec:
470             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
471
472 # copy_git_repos(p, src, dst, dst_repo, args)
473 #
474 #    Copies all git repositories referenced by pipeline instance or
475 #    template 'p' from src to dst.
476 #
477 #    For each component c in the pipeline:
478 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
479 #      * Rename script versions:
480 #          * c['script_version']
481 #          * c['job']['script_version']
482 #          * c['job']['supplied_script_version']
483 #        to the commit hashes they resolve to, since any symbolic
484 #        names (tags, branches) are not preserved in the destination repo.
485 #
486 #    The pipeline object is updated in place with the new repository
487 #    names.  The return value is undefined.
488 #
489 def copy_git_repos(p, src, dst, dst_repo, args):
490     for component in p['components'].itervalues():
491         migrate_jobspec(component, src, dst, dst_repo, args)
492         if 'job' in component:
493             migrate_jobspec(component['job'], src, dst, dst_repo, args)
494
495 def total_collection_size(manifest_text):
496     """Return the total number of bytes in this collection (excluding
497     duplicate blocks)."""
498
499     total_bytes = 0
500     locators_seen = {}
501     for line in manifest_text.splitlines():
502         words = line.split()
503         for word in words[1:]:
504             try:
505                 loc = arvados.KeepLocator(word)
506             except ValueError:
507                 continue  # this word isn't a locator, skip it
508             if loc.md5sum not in locators_seen:
509                 locators_seen[loc.md5sum] = True
510                 total_bytes += loc.size
511
512     return total_bytes
513
514 def create_collection_from(c, src, dst, args):
515     """Create a new collection record on dst, and copy Docker metadata if
516     available."""
517
518     collection_uuid = c['uuid']
519     del c['uuid']
520
521     if not c["name"]:
522         c['name'] = "copied from " + collection_uuid
523
524     if 'properties' in c:
525         del c['properties']
526
527     c['owner_uuid'] = args.project_uuid
528
529     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
530
531     # Create docker_image_repo+tag and docker_image_hash links
532     # at the destination.
533     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
534         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
535
536         for src_link in docker_links:
537             body = {key: src_link[key]
538                     for key in ['link_class', 'name', 'properties']}
539             body['head_uuid'] = dst_collection['uuid']
540             body['owner_uuid'] = args.project_uuid
541
542             lk = dst.links().create(body=body).execute(num_retries=args.retries)
543             logger.debug('created dst link {}'.format(lk))
544
545     return dst_collection
546
547 # copy_collection(obj_uuid, src, dst, args)
548 #
549 #    Copies the collection identified by obj_uuid from src to dst.
550 #    Returns the collection object created at dst.
551 #
552 #    If args.progress is True, produce a human-friendly progress
553 #    report.
554 #
555 #    If a collection with the desired portable_data_hash already
556 #    exists at dst, and args.force is False, copy_collection returns
557 #    the existing collection without copying any blocks.  Otherwise
558 #    (if no collection exists or if args.force is True)
559 #    copy_collection copies all of the collection data blocks from src
560 #    to dst.
561 #
562 #    For this application, it is critical to preserve the
563 #    collection's manifest hash, which is not guaranteed with the
564 #    arvados.CollectionReader and arvados.CollectionWriter classes.
565 #    Copying each block in the collection manually, followed by
566 #    the manifest block, ensures that the collection's manifest
567 #    hash will not change.
568 #
569 def copy_collection(obj_uuid, src, dst, args):
570     if arvados.util.keep_locator_pattern.match(obj_uuid):
571         # If the obj_uuid is a portable data hash, it might not be uniquely
572         # identified with a particular collection.  As a result, it is
573         # ambigious as to what name to use for the copy.  Apply some heuristics
574         # to pick which collection to get the name from.
575         srccol = src.collections().list(
576             filters=[['portable_data_hash', '=', obj_uuid]],
577             order="created_at asc"
578             ).execute(num_retries=args.retries)
579
580         items = srccol.get("items")
581
582         if not items:
583             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
584             return
585
586         c = None
587
588         if len(items) == 1:
589             # There's only one collection with the PDH, so use that.
590             c = items[0]
591         if not c:
592             # See if there is a collection that's in the same project
593             # as the root item (usually a pipeline) being copied.
594             for i in items:
595                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
596                     c = i
597                     break
598         if not c:
599             # Didn't find any collections located in the same project, so
600             # pick the oldest collection that has a name assigned to it.
601             for i in items:
602                 if i.get("name"):
603                     c = i
604                     break
605         if not c:
606             # None of the collections have names (?!), so just pick the
607             # first one.
608             c = items[0]
609
610         # list() doesn't return manifest text (and we don't want it to,
611         # because we don't need the same maninfest text sent to us 50
612         # times) so go and retrieve the collection object directly
613         # which will include the manifest text.
614         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
615     else:
616         # Assume this is an actual collection uuid, so fetch it directly.
617         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
618
619     # If a collection with this hash already exists at the
620     # destination, and 'force' is not true, just return that
621     # collection.
622     if not args.force:
623         if 'portable_data_hash' in c:
624             colhash = c['portable_data_hash']
625         else:
626             colhash = c['uuid']
627         dstcol = dst.collections().list(
628             filters=[['portable_data_hash', '=', colhash]]
629         ).execute(num_retries=args.retries)
630         if dstcol['items_available'] > 0:
631             for d in dstcol['items']:
632                 if ((args.project_uuid == d['owner_uuid']) and
633                     (c.get('name') == d['name']) and
634                     (c['portable_data_hash'] == d['portable_data_hash'])):
635                     return d
636             c['manifest_text'] = dst.collections().get(
637                 uuid=dstcol['items'][0]['uuid']
638             ).execute(num_retries=args.retries)['manifest_text']
639             return create_collection_from(c, src, dst, args)
640
641     # Fetch the collection's manifest.
642     manifest = c['manifest_text']
643     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
644
645     # Copy each block from src_keep to dst_keep.
646     # Use the newly signed locators returned from dst_keep to build
647     # a new manifest as we go.
648     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
649     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
650     dst_manifest = ""
651     dst_locators = {}
652     bytes_written = 0
653     bytes_expected = total_collection_size(manifest)
654     if args.progress:
655         progress_writer = ProgressWriter(human_progress)
656     else:
657         progress_writer = None
658
659     for line in manifest.splitlines():
660         words = line.split()
661         dst_manifest += words[0]
662         for word in words[1:]:
663             try:
664                 loc = arvados.KeepLocator(word)
665             except ValueError:
666                 # If 'word' can't be parsed as a locator,
667                 # presume it's a filename.
668                 dst_manifest += ' ' + word
669                 continue
670             blockhash = loc.md5sum
671             # copy this block if we haven't seen it before
672             # (otherwise, just reuse the existing dst_locator)
673             if blockhash not in dst_locators:
674                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
675                 if progress_writer:
676                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
677                 data = src_keep.get(word)
678                 dst_locator = dst_keep.put(data)
679                 dst_locators[blockhash] = dst_locator
680                 bytes_written += loc.size
681             dst_manifest += ' ' + dst_locators[blockhash]
682         dst_manifest += "\n"
683
684     if progress_writer:
685         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
686         progress_writer.finish()
687
688     # Copy the manifest and save the collection.
689     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
690
691     c['manifest_text'] = dst_manifest
692     return create_collection_from(c, src, dst, args)
693
694 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
695     r = api.repositories().list(
696         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
697     if r['items_available'] != 1:
698         raise Exception('cannot identify repo {}; {} repos found'
699                         .format(repo_name, r['items_available']))
700
701     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
702     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
703     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
704
705     priority = https_url + other_url + http_url
706
707     git_config = []
708     git_url = None
709     for url in priority:
710         if url.startswith("http"):
711             u = urlparse.urlsplit(url)
712             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
713             git_config = ["-c", "credential.%s/.username=none" % baseurl,
714                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
715         else:
716             git_config = []
717
718         try:
719             logger.debug("trying %s", url)
720             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
721                                       env={"HOME": os.environ["HOME"],
722                                            "ARVADOS_API_TOKEN": api.api_token,
723                                            "GIT_ASKPASS": "/bin/false"})
724         except arvados.errors.CommandFailedError:
725             pass
726         else:
727             git_url = url
728             break
729
730     if not git_url:
731         raise Exception('Cannot access git repository, tried {}'
732                         .format(priority))
733
734     if git_url.startswith("http:"):
735         if allow_insecure_http:
736             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
737         else:
738             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
739
740     return (git_url, git_config)
741
742
743 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
744 #
745 #    Copies commits from git repository 'src_git_repo' on Arvados
746 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
747 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
748 #    or "jsmith")
749 #
750 #    All commits will be copied to a destination branch named for the
751 #    source repository URL.
752 #
753 #    The destination repository must already exist.
754 #
755 #    The user running this command must be authenticated
756 #    to both repositories.
757 #
758 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
759     # Identify the fetch and push URLs for the git repositories.
760
761     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
762     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
763
764     logger.debug('src_git_url: {}'.format(src_git_url))
765     logger.debug('dst_git_url: {}'.format(dst_git_url))
766
767     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
768
769     # Copy git commits from src repo to dst repo.
770     if src_git_repo not in local_repo_dir:
771         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
772         arvados.util.run_command(
773             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
774              local_repo_dir[src_git_repo]],
775             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
776             env={"HOME": os.environ["HOME"],
777                  "ARVADOS_API_TOKEN": src.api_token,
778                  "GIT_ASKPASS": "/bin/false"})
779         arvados.util.run_command(
780             ["git", "remote", "add", "dst", dst_git_url],
781             cwd=local_repo_dir[src_git_repo])
782     arvados.util.run_command(
783         ["git", "branch", dst_branch, script_version],
784         cwd=local_repo_dir[src_git_repo])
785     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
786                              cwd=local_repo_dir[src_git_repo],
787                              env={"HOME": os.environ["HOME"],
788                                   "ARVADOS_API_TOKEN": dst.api_token,
789                                   "GIT_ASKPASS": "/bin/false"})
790
791 def copy_docker_images(pipeline, src, dst, args):
792     """Copy any docker images named in the pipeline components'
793     runtime_constraints field from src to dst."""
794
795     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
796     for c_name, c_info in pipeline['components'].iteritems():
797         if ('runtime_constraints' in c_info and
798             'docker_image' in c_info['runtime_constraints']):
799             copy_docker_image(
800                 c_info['runtime_constraints']['docker_image'],
801                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
802                 src, dst, args)
803
804
805 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
806     """Copy the docker image identified by docker_image and
807     docker_image_tag from src to dst. Create appropriate
808     docker_image_repo+tag and docker_image_hash links at dst.
809
810     """
811
812     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
813
814     # Find the link identifying this docker image.
815     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
816         src, args.retries, docker_image, docker_image_tag)
817     if docker_image_list:
818         image_uuid, image_info = docker_image_list[0]
819         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
820
821         # Copy the collection it refers to.
822         dst_image_col = copy_collection(image_uuid, src, dst, args)
823     elif arvados.util.keep_locator_pattern.match(docker_image):
824         dst_image_col = copy_collection(docker_image, src, dst, args)
825     else:
826         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
827
828 # git_rev_parse(rev, repo)
829 #
830 #    Returns the 40-character commit hash corresponding to 'rev' in
831 #    git repository 'repo' (which must be the path of a local git
832 #    repository)
833 #
834 def git_rev_parse(rev, repo):
835     gitout, giterr = arvados.util.run_command(
836         ['git', 'rev-parse', rev], cwd=repo)
837     return gitout.strip()
838
839 # uuid_type(api, object_uuid)
840 #
841 #    Returns the name of the class that object_uuid belongs to, based on
842 #    the second field of the uuid.  This function consults the api's
843 #    schema to identify the object class.
844 #
845 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
846 #
847 #    Special case: if handed a Keep locator hash, return 'Collection'.
848 #
849 def uuid_type(api, object_uuid):
850     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
851         return 'Collection'
852     p = object_uuid.split('-')
853     if len(p) == 3:
854         type_prefix = p[1]
855         for k in api._schema.schemas:
856             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
857             if type_prefix == obj_class:
858                 return k
859     return None
860
861 def abort(msg, code=1):
862     logger.info("arv-copy: %s", msg)
863     exit(code)
864
865
866 # Code for reporting on the progress of a collection upload.
867 # Stolen from arvados.commands.put.ArvPutCollectionWriter
868 # TODO(twp): figure out how to refactor into a shared library
869 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
870 # code)
871
872 def machine_progress(obj_uuid, bytes_written, bytes_expected):
873     return "{} {}: {} {} written {} total\n".format(
874         sys.argv[0],
875         os.getpid(),
876         obj_uuid,
877         bytes_written,
878         -1 if (bytes_expected is None) else bytes_expected)
879
880 def human_progress(obj_uuid, bytes_written, bytes_expected):
881     if bytes_expected:
882         return "\r{}: {}M / {}M {:.1%} ".format(
883             obj_uuid,
884             bytes_written >> 20, bytes_expected >> 20,
885             float(bytes_written) / bytes_expected)
886     else:
887         return "\r{}: {} ".format(obj_uuid, bytes_written)
888
889 class ProgressWriter(object):
890     _progress_func = None
891     outfile = sys.stderr
892
893     def __init__(self, progress_func):
894         self._progress_func = progress_func
895
896     def report(self, obj_uuid, bytes_written, bytes_expected):
897         if self._progress_func is not None:
898             self.outfile.write(
899                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
900
901     def finish(self):
902         self.outfile.write("\n")
903
904 if __name__ == '__main__':
905     main()