8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import contextlib
21 import getpass
22 import os
23 import re
24 import shutil
25 import sys
26 import logging
27 import tempfile
28 import urlparse
29
30 import arvados
31 import arvados.config
32 import arvados.keep
33 import arvados.util
34 import arvados.commands._util as arv_cmd
35 import arvados.commands.keepdocker
36
37 from arvados.api import OrderedJsonModel
38 from arvados._version import __version__
39
40 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
41
42 logger = logging.getLogger('arvados.arv-copy')
43
44 # local_repo_dir records which git repositories from the Arvados source
45 # instance have been checked out locally during this run, and to which
46 # directories.
47 # e.g. if repository 'twp' from src_arv has been cloned into
48 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
49 #
50 local_repo_dir = {}
51
52 # List of collections that have been copied in this session, and their
53 # destination collection UUIDs.
54 collections_copied = {}
55
56 # Set of (repository, script_version) two-tuples of commits copied in git.
57 scripts_copied = set()
58
59 # The owner_uuid of the object being copied
60 src_owner_uuid = None
61
62 def main():
63     copy_opts = argparse.ArgumentParser(add_help=False)
64
65     copy_opts.add_argument(
66         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
67         help='Print version and exit.')
68     copy_opts.add_argument(
69         '-v', '--verbose', dest='verbose', action='store_true',
70         help='Verbose output.')
71     copy_opts.add_argument(
72         '--progress', dest='progress', action='store_true',
73         help='Report progress on copying collections. (default)')
74     copy_opts.add_argument(
75         '--no-progress', dest='progress', action='store_false',
76         help='Do not report progress on copying collections.')
77     copy_opts.add_argument(
78         '-f', '--force', dest='force', action='store_true',
79         help='Perform copy even if the object appears to exist at the remote destination.')
80     copy_opts.add_argument(
81         '--force-filters', action='store_true', default=False,
82         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
83     copy_opts.add_argument(
84         '--src', dest='source_arvados', required=True,
85         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
86     copy_opts.add_argument(
87         '--dst', dest='destination_arvados', required=True,
88         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
89     copy_opts.add_argument(
90         '--recursive', dest='recursive', action='store_true',
91         help='Recursively copy any dependencies for this object. (default)')
92     copy_opts.add_argument(
93         '--no-recursive', dest='recursive', action='store_false',
94         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
95     copy_opts.add_argument(
96         '--dst-git-repo', dest='dst_git_repo',
97         help='The name of the destination git repository. Required when copying a pipeline recursively.')
98     copy_opts.add_argument(
99         '--project-uuid', dest='project_uuid',
100         help='The UUID of the project at the destination to which the pipeline should be copied.')
101     copy_opts.add_argument(
102         '--allow-git-http-src', action="store_true",
103         help='Allow cloning git repositories over insecure http')
104     copy_opts.add_argument(
105         '--allow-git-http-dst', action="store_true",
106         help='Allow pushing git repositories over insecure http')
107
108     copy_opts.add_argument(
109         'object_uuid',
110         help='The UUID of the object to be copied.')
111     copy_opts.set_defaults(progress=True)
112     copy_opts.set_defaults(recursive=True)
113
114     parser = argparse.ArgumentParser(
115         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
116         parents=[copy_opts, arv_cmd.retry_opt])
117     args = parser.parse_args()
118
119     if args.verbose:
120         logger.setLevel(logging.DEBUG)
121     else:
122         logger.setLevel(logging.INFO)
123
124     # Create API clients for the source and destination instances
125     src_arv = api_for_instance(args.source_arvados)
126     dst_arv = api_for_instance(args.destination_arvados)
127
128     if not args.project_uuid:
129         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
130
131     # Identify the kind of object we have been given, and begin copying.
132     t = uuid_type(src_arv, args.object_uuid)
133     if t == 'Collection':
134         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
135         result = copy_collection(args.object_uuid,
136                                  src_arv, dst_arv,
137                                  args)
138     elif t == 'PipelineInstance':
139         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
140         result = copy_pipeline_instance(args.object_uuid,
141                                         src_arv, dst_arv,
142                                         args)
143     elif t == 'PipelineTemplate':
144         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
145         result = copy_pipeline_template(args.object_uuid,
146                                         src_arv, dst_arv, args)
147     else:
148         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
149
150     # Clean up any outstanding temp git repositories.
151     for d in local_repo_dir.values():
152         shutil.rmtree(d, ignore_errors=True)
153
154     # If no exception was thrown and the response does not have an
155     # error_token field, presume success
156     if 'error_token' in result or 'uuid' not in result:
157         logger.error("API server returned an error result: {}".format(result))
158         exit(1)
159
160     logger.info("")
161     logger.info("Success: created copy with uuid {}".format(result['uuid']))
162     exit(0)
163
164 def set_src_owner_uuid(resource, uuid, args):
165     global src_owner_uuid
166     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
167     src_owner_uuid = c.get("owner_uuid")
168
169 # api_for_instance(instance_name)
170 #
171 #     Creates an API client for the Arvados instance identified by
172 #     instance_name.
173 #
174 #     If instance_name contains a slash, it is presumed to be a path
175 #     (either local or absolute) to a file with Arvados configuration
176 #     settings.
177 #
178 #     Otherwise, it is presumed to be the name of a file in
179 #     $HOME/.config/arvados/instance_name.conf
180 #
181 def api_for_instance(instance_name):
182     if '/' in instance_name:
183         config_file = instance_name
184     else:
185         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
186
187     try:
188         cfg = arvados.config.load(config_file)
189     except (IOError, OSError) as e:
190         abort(("Could not open config file {}: {}\n" +
191                "You must make sure that your configuration tokens\n" +
192                "for Arvados instance {} are in {} and that this\n" +
193                "file is readable.").format(
194                    config_file, e, instance_name, config_file))
195
196     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
197         api_is_insecure = (
198             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
199                 ['1', 't', 'true', 'y', 'yes']))
200         client = arvados.api('v1',
201                              host=cfg['ARVADOS_API_HOST'],
202                              token=cfg['ARVADOS_API_TOKEN'],
203                              insecure=api_is_insecure,
204                              model=OrderedJsonModel())
205     else:
206         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
207     return client
208
209 # Check if git is available
210 def check_git_availability():
211     try:
212         arvados.util.run_command(['git', '--help'])
213     except Exception:
214         abort('git command is not available. Please ensure git is installed.')
215
216 # copy_pipeline_instance(pi_uuid, src, dst, args)
217 #
218 #    Copies a pipeline instance identified by pi_uuid from src to dst.
219 #
220 #    If the args.recursive option is set:
221 #      1. Copies all input collections
222 #           * For each component in the pipeline, include all collections
223 #             listed as job dependencies for that component)
224 #      2. Copy docker images
225 #      3. Copy git repositories
226 #      4. Copy the pipeline template
227 #
228 #    The only changes made to the copied pipeline instance are:
229 #      1. The original pipeline instance UUID is preserved in
230 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
231 #      2. The pipeline_template_uuid is changed to the new template uuid.
232 #      3. The owner_uuid of the instance is changed to the user who
233 #         copied it.
234 #
235 def copy_pipeline_instance(pi_uuid, src, dst, args):
236     # Fetch the pipeline instance record.
237     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
238
239     if args.recursive:
240         check_git_availability()
241
242         if not args.dst_git_repo:
243             abort('--dst-git-repo is required when copying a pipeline recursively.')
244         # Copy the pipeline template and save the copied template.
245         if pi.get('pipeline_template_uuid', None):
246             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
247                                         src, dst, args)
248
249         # Copy input collections, docker images and git repos.
250         pi = copy_collections(pi, src, dst, args)
251         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
252         copy_docker_images(pi, src, dst, args)
253
254         # Update the fields of the pipeline instance with the copied
255         # pipeline template.
256         if pi.get('pipeline_template_uuid', None):
257             pi['pipeline_template_uuid'] = pt['uuid']
258
259     else:
260         # not recursive
261         logger.info("Copying only pipeline instance %s.", pi_uuid)
262         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
263
264     # Update the pipeline instance properties, and create the new
265     # instance at dst.
266     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
267     pi['description'] = "Pipeline copied from {}\n\n{}".format(
268         pi_uuid,
269         pi['description'] if pi.get('description', None) else '')
270
271     pi['owner_uuid'] = args.project_uuid
272
273     del pi['uuid']
274
275     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
276     return new_pi
277
278 def filter_iter(arg):
279     """Iterate a filter string-or-list.
280
281     Pass in a filter field that can either be a string or list.
282     This will iterate elements as if the field had been written as a list.
283     """
284     if isinstance(arg, basestring):
285         return iter((arg,))
286     else:
287         return iter(arg)
288
289 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
290     """Update a single repository filter in-place for the destination.
291
292     If the filter checks that the repository is src_repository, it is
293     updated to check that the repository is dst_repository.  If it does
294     anything else, this function raises ValueError.
295     """
296     if src_repository is None:
297         raise ValueError("component does not specify a source repository")
298     elif dst_repository is None:
299         raise ValueError("no destination repository specified to update repository filter")
300     elif repo_filter[1:] == ['=', src_repository]:
301         repo_filter[2] = dst_repository
302     elif repo_filter[1:] == ['in', [src_repository]]:
303         repo_filter[2] = [dst_repository]
304     else:
305         raise ValueError("repository filter is not a simple source match")
306
307 def migrate_script_version_filter(version_filter):
308     """Update a single script_version filter in-place for the destination.
309
310     Currently this function checks that all the filter operands are Git
311     commit hashes.  If they're not, it raises ValueError to indicate that
312     the filter is not portable.  It could be extended to make other
313     transformations in the future.
314     """
315     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
316         raise ValueError("script_version filter is not limited to commit hashes")
317
318 def attr_filtered(filter_, *attr_names):
319     """Return True if filter_ applies to any of attr_names, else False."""
320     return any((name == 'any') or (name in attr_names)
321                for name in filter_iter(filter_[0]))
322
323 @contextlib.contextmanager
324 def exception_handler(handler, *exc_types):
325     """If any exc_types are raised in the block, call handler on the exception."""
326     try:
327         yield
328     except exc_types as error:
329         handler(error)
330
331 def migrate_components_filters(template_components, dst_git_repo):
332     """Update template component filters in-place for the destination.
333
334     template_components is a dictionary of components in a pipeline template.
335     This method walks over each component's filters, and updates them to have
336     identical semantics on the destination cluster.  It returns a list of
337     error strings that describe what filters could not be updated safely.
338
339     dst_git_repo is the name of the destination Git repository, which can
340     be None if that is not known.
341     """
342     errors = []
343     for cname, cspec in template_components.iteritems():
344         def add_error(errmsg):
345             errors.append("{}: {}".format(cname, errmsg))
346         if not isinstance(cspec, dict):
347             add_error("value is not a component definition")
348             continue
349         src_repository = cspec.get('repository')
350         filters = cspec.get('filters', [])
351         if not isinstance(filters, list):
352             add_error("filters are not a list")
353             continue
354         for cfilter in filters:
355             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
356                 add_error("malformed filter {!r}".format(cfilter))
357                 continue
358             if attr_filtered(cfilter, 'repository'):
359                 with exception_handler(add_error, ValueError):
360                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
361             if attr_filtered(cfilter, 'script_version'):
362                 with exception_handler(add_error, ValueError):
363                     migrate_script_version_filter(cfilter)
364     return errors
365
366 # copy_pipeline_template(pt_uuid, src, dst, args)
367 #
368 #    Copies a pipeline template identified by pt_uuid from src to dst.
369 #
370 #    If args.recursive is True, also copy any collections, docker
371 #    images and git repositories that this template references.
372 #
373 #    The owner_uuid of the new template is changed to that of the user
374 #    who copied the template.
375 #
376 #    Returns the copied pipeline template object.
377 #
378 def copy_pipeline_template(pt_uuid, src, dst, args):
379     # fetch the pipeline template from the source instance
380     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
381
382     if not args.force_filters:
383         filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
384         if filter_errors:
385             abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
386                   "\n".join(filter_errors))
387
388     if args.recursive:
389         check_git_availability()
390
391         if not args.dst_git_repo:
392             abort('--dst-git-repo is required when copying a pipeline recursively.')
393         # Copy input collections, docker images and git repos.
394         pt = copy_collections(pt, src, dst, args)
395         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
396         copy_docker_images(pt, src, dst, args)
397
398     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
399         pt_uuid,
400         pt['description'] if pt.get('description', None) else '')
401     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
402     del pt['uuid']
403
404     pt['owner_uuid'] = args.project_uuid
405
406     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
407
408 # copy_collections(obj, src, dst, args)
409 #
410 #    Recursively copies all collections referenced by 'obj' from src
411 #    to dst.  obj may be a dict or a list, in which case we run
412 #    copy_collections on every value it contains. If it is a string,
413 #    search it for any substring that matches a collection hash or uuid
414 #    (this will find hidden references to collections like
415 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
416 #
417 #    Returns a copy of obj with any old collection uuids replaced by
418 #    the new ones.
419 #
420 def copy_collections(obj, src, dst, args):
421
422     def copy_collection_fn(collection_match):
423         """Helper function for regex substitution: copies a single collection,
424         identified by the collection_match MatchObject, to the
425         destination.  Returns the destination collection uuid (or the
426         portable data hash if that's what src_id is).
427
428         """
429         src_id = collection_match.group(0)
430         if src_id not in collections_copied:
431             dst_col = copy_collection(src_id, src, dst, args)
432             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
433                 collections_copied[src_id] = src_id
434             else:
435                 collections_copied[src_id] = dst_col['uuid']
436         return collections_copied[src_id]
437
438     if isinstance(obj, basestring):
439         # Copy any collections identified in this string to dst, replacing
440         # them with the dst uuids as necessary.
441         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
442         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
443         return obj
444     elif isinstance(obj, dict):
445         return type(obj)((v, copy_collections(obj[v], src, dst, args))
446                          for v in obj)
447     elif isinstance(obj, list):
448         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
449     return obj
450
451 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
452     """Copy a job's script to the destination repository, and update its record.
453
454     Given a jobspec dictionary, this function finds the referenced script from
455     src and copies it to dst and dst_repo.  It also updates jobspec in place to
456     refer to names on the destination.
457     """
458     repo = jobspec.get('repository')
459     if repo is None:
460         return
461     # script_version is the "script_version" parameter from the source
462     # component or job.  If no script_version was supplied in the
463     # component or job, it is a mistake in the pipeline, but for the
464     # purposes of copying the repository, default to "master".
465     script_version = jobspec.get('script_version') or 'master'
466     script_key = (repo, script_version)
467     if script_key not in scripts_copied:
468         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
469         scripts_copied.add(script_key)
470     jobspec['repository'] = dst_repo
471     repo_dir = local_repo_dir[repo]
472     for version_key in ['script_version', 'supplied_script_version']:
473         if version_key in jobspec:
474             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
475
476 # copy_git_repos(p, src, dst, dst_repo, args)
477 #
478 #    Copies all git repositories referenced by pipeline instance or
479 #    template 'p' from src to dst.
480 #
481 #    For each component c in the pipeline:
482 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
483 #      * Rename script versions:
484 #          * c['script_version']
485 #          * c['job']['script_version']
486 #          * c['job']['supplied_script_version']
487 #        to the commit hashes they resolve to, since any symbolic
488 #        names (tags, branches) are not preserved in the destination repo.
489 #
490 #    The pipeline object is updated in place with the new repository
491 #    names.  The return value is undefined.
492 #
493 def copy_git_repos(p, src, dst, dst_repo, args):
494     for component in p['components'].itervalues():
495         migrate_jobspec(component, src, dst, dst_repo, args)
496         if 'job' in component:
497             migrate_jobspec(component['job'], src, dst, dst_repo, args)
498
499 def total_collection_size(manifest_text):
500     """Return the total number of bytes in this collection (excluding
501     duplicate blocks)."""
502
503     total_bytes = 0
504     locators_seen = {}
505     for line in manifest_text.splitlines():
506         words = line.split()
507         for word in words[1:]:
508             try:
509                 loc = arvados.KeepLocator(word)
510             except ValueError:
511                 continue  # this word isn't a locator, skip it
512             if loc.md5sum not in locators_seen:
513                 locators_seen[loc.md5sum] = True
514                 total_bytes += loc.size
515
516     return total_bytes
517
518 def create_collection_from(c, src, dst, args):
519     """Create a new collection record on dst, and copy Docker metadata if
520     available."""
521
522     collection_uuid = c['uuid']
523     del c['uuid']
524
525     if not c["name"]:
526         c['name'] = "copied from " + collection_uuid
527
528     if 'properties' in c:
529         del c['properties']
530
531     c['owner_uuid'] = args.project_uuid
532
533     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
534
535     # Create docker_image_repo+tag and docker_image_hash links
536     # at the destination.
537     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
538         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
539
540         for src_link in docker_links:
541             body = {key: src_link[key]
542                     for key in ['link_class', 'name', 'properties']}
543             body['head_uuid'] = dst_collection['uuid']
544             body['owner_uuid'] = args.project_uuid
545
546             lk = dst.links().create(body=body).execute(num_retries=args.retries)
547             logger.debug('created dst link {}'.format(lk))
548
549     return dst_collection
550
551 # copy_collection(obj_uuid, src, dst, args)
552 #
553 #    Copies the collection identified by obj_uuid from src to dst.
554 #    Returns the collection object created at dst.
555 #
556 #    If args.progress is True, produce a human-friendly progress
557 #    report.
558 #
559 #    If a collection with the desired portable_data_hash already
560 #    exists at dst, and args.force is False, copy_collection returns
561 #    the existing collection without copying any blocks.  Otherwise
562 #    (if no collection exists or if args.force is True)
563 #    copy_collection copies all of the collection data blocks from src
564 #    to dst.
565 #
566 #    For this application, it is critical to preserve the
567 #    collection's manifest hash, which is not guaranteed with the
568 #    arvados.CollectionReader and arvados.CollectionWriter classes.
569 #    Copying each block in the collection manually, followed by
570 #    the manifest block, ensures that the collection's manifest
571 #    hash will not change.
572 #
573 def copy_collection(obj_uuid, src, dst, args):
574     if arvados.util.keep_locator_pattern.match(obj_uuid):
575         # If the obj_uuid is a portable data hash, it might not be uniquely
576         # identified with a particular collection.  As a result, it is
577         # ambigious as to what name to use for the copy.  Apply some heuristics
578         # to pick which collection to get the name from.
579         srccol = src.collections().list(
580             filters=[['portable_data_hash', '=', obj_uuid]],
581             order="created_at asc"
582             ).execute(num_retries=args.retries)
583
584         items = srccol.get("items")
585
586         if not items:
587             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
588             return
589
590         c = None
591
592         if len(items) == 1:
593             # There's only one collection with the PDH, so use that.
594             c = items[0]
595         if not c:
596             # See if there is a collection that's in the same project
597             # as the root item (usually a pipeline) being copied.
598             for i in items:
599                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
600                     c = i
601                     break
602         if not c:
603             # Didn't find any collections located in the same project, so
604             # pick the oldest collection that has a name assigned to it.
605             for i in items:
606                 if i.get("name"):
607                     c = i
608                     break
609         if not c:
610             # None of the collections have names (?!), so just pick the
611             # first one.
612             c = items[0]
613
614         # list() doesn't return manifest text (and we don't want it to,
615         # because we don't need the same maninfest text sent to us 50
616         # times) so go and retrieve the collection object directly
617         # which will include the manifest text.
618         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
619     else:
620         # Assume this is an actual collection uuid, so fetch it directly.
621         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
622
623     # If a collection with this hash already exists at the
624     # destination, and 'force' is not true, just return that
625     # collection.
626     if not args.force:
627         if 'portable_data_hash' in c:
628             colhash = c['portable_data_hash']
629         else:
630             colhash = c['uuid']
631         dstcol = dst.collections().list(
632             filters=[['portable_data_hash', '=', colhash]]
633         ).execute(num_retries=args.retries)
634         if dstcol['items_available'] > 0:
635             for d in dstcol['items']:
636                 if ((args.project_uuid == d['owner_uuid']) and
637                     (c.get('name') == d['name']) and
638                     (c['portable_data_hash'] == d['portable_data_hash'])):
639                     return d
640             c['manifest_text'] = dst.collections().get(
641                 uuid=dstcol['items'][0]['uuid']
642             ).execute(num_retries=args.retries)['manifest_text']
643             return create_collection_from(c, src, dst, args)
644
645     # Fetch the collection's manifest.
646     manifest = c['manifest_text']
647     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
648
649     # Copy each block from src_keep to dst_keep.
650     # Use the newly signed locators returned from dst_keep to build
651     # a new manifest as we go.
652     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
653     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
654     dst_manifest = ""
655     dst_locators = {}
656     bytes_written = 0
657     bytes_expected = total_collection_size(manifest)
658     if args.progress:
659         progress_writer = ProgressWriter(human_progress)
660     else:
661         progress_writer = None
662
663     for line in manifest.splitlines():
664         words = line.split()
665         dst_manifest += words[0]
666         for word in words[1:]:
667             try:
668                 loc = arvados.KeepLocator(word)
669             except ValueError:
670                 # If 'word' can't be parsed as a locator,
671                 # presume it's a filename.
672                 dst_manifest += ' ' + word
673                 continue
674             blockhash = loc.md5sum
675             # copy this block if we haven't seen it before
676             # (otherwise, just reuse the existing dst_locator)
677             if blockhash not in dst_locators:
678                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
679                 if progress_writer:
680                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
681                 data = src_keep.get(word)
682                 dst_locator = dst_keep.put(data)
683                 dst_locators[blockhash] = dst_locator
684                 bytes_written += loc.size
685             dst_manifest += ' ' + dst_locators[blockhash]
686         dst_manifest += "\n"
687
688     if progress_writer:
689         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
690         progress_writer.finish()
691
692     # Copy the manifest and save the collection.
693     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
694
695     c['manifest_text'] = dst_manifest
696     return create_collection_from(c, src, dst, args)
697
698 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
699     r = api.repositories().list(
700         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
701     if r['items_available'] != 1:
702         raise Exception('cannot identify repo {}; {} repos found'
703                         .format(repo_name, r['items_available']))
704
705     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
706     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
707     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
708
709     priority = https_url + other_url + http_url
710
711     git_config = []
712     git_url = None
713     for url in priority:
714         if url.startswith("http"):
715             u = urlparse.urlsplit(url)
716             baseurl = urlparse.urlunsplit((u.scheme, u.netloc, "", "", ""))
717             git_config = ["-c", "credential.%s/.username=none" % baseurl,
718                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
719         else:
720             git_config = []
721
722         try:
723             logger.debug("trying %s", url)
724             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
725                                       env={"HOME": os.environ["HOME"],
726                                            "ARVADOS_API_TOKEN": api.api_token,
727                                            "GIT_ASKPASS": "/bin/false"})
728         except arvados.errors.CommandFailedError:
729             pass
730         else:
731             git_url = url
732             break
733
734     if not git_url:
735         raise Exception('Cannot access git repository, tried {}'
736                         .format(priority))
737
738     if git_url.startswith("http:"):
739         if allow_insecure_http:
740             logger.warn("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
741         else:
742             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
743
744     return (git_url, git_config)
745
746
747 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
748 #
749 #    Copies commits from git repository 'src_git_repo' on Arvados
750 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
751 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
752 #    or "jsmith")
753 #
754 #    All commits will be copied to a destination branch named for the
755 #    source repository URL.
756 #
757 #    The destination repository must already exist.
758 #
759 #    The user running this command must be authenticated
760 #    to both repositories.
761 #
762 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
763     # Identify the fetch and push URLs for the git repositories.
764
765     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
766     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
767
768     logger.debug('src_git_url: {}'.format(src_git_url))
769     logger.debug('dst_git_url: {}'.format(dst_git_url))
770
771     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
772
773     # Copy git commits from src repo to dst repo.
774     if src_git_repo not in local_repo_dir:
775         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
776         arvados.util.run_command(
777             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
778              local_repo_dir[src_git_repo]],
779             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
780             env={"HOME": os.environ["HOME"],
781                  "ARVADOS_API_TOKEN": src.api_token,
782                  "GIT_ASKPASS": "/bin/false"})
783         arvados.util.run_command(
784             ["git", "remote", "add", "dst", dst_git_url],
785             cwd=local_repo_dir[src_git_repo])
786     arvados.util.run_command(
787         ["git", "branch", dst_branch, script_version],
788         cwd=local_repo_dir[src_git_repo])
789     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
790                              cwd=local_repo_dir[src_git_repo],
791                              env={"HOME": os.environ["HOME"],
792                                   "ARVADOS_API_TOKEN": dst.api_token,
793                                   "GIT_ASKPASS": "/bin/false"})
794
795 def copy_docker_images(pipeline, src, dst, args):
796     """Copy any docker images named in the pipeline components'
797     runtime_constraints field from src to dst."""
798
799     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
800     for c_name, c_info in pipeline['components'].iteritems():
801         if ('runtime_constraints' in c_info and
802             'docker_image' in c_info['runtime_constraints']):
803             copy_docker_image(
804                 c_info['runtime_constraints']['docker_image'],
805                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
806                 src, dst, args)
807
808
809 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
810     """Copy the docker image identified by docker_image and
811     docker_image_tag from src to dst. Create appropriate
812     docker_image_repo+tag and docker_image_hash links at dst.
813
814     """
815
816     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
817
818     # Find the link identifying this docker image.
819     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
820         src, args.retries, docker_image, docker_image_tag)
821     if docker_image_list:
822         image_uuid, image_info = docker_image_list[0]
823         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
824
825         # Copy the collection it refers to.
826         dst_image_col = copy_collection(image_uuid, src, dst, args)
827     elif arvados.util.keep_locator_pattern.match(docker_image):
828         dst_image_col = copy_collection(docker_image, src, dst, args)
829     else:
830         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
831
832 # git_rev_parse(rev, repo)
833 #
834 #    Returns the 40-character commit hash corresponding to 'rev' in
835 #    git repository 'repo' (which must be the path of a local git
836 #    repository)
837 #
838 def git_rev_parse(rev, repo):
839     gitout, giterr = arvados.util.run_command(
840         ['git', 'rev-parse', rev], cwd=repo)
841     return gitout.strip()
842
843 # uuid_type(api, object_uuid)
844 #
845 #    Returns the name of the class that object_uuid belongs to, based on
846 #    the second field of the uuid.  This function consults the api's
847 #    schema to identify the object class.
848 #
849 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
850 #
851 #    Special case: if handed a Keep locator hash, return 'Collection'.
852 #
853 def uuid_type(api, object_uuid):
854     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
855         return 'Collection'
856     p = object_uuid.split('-')
857     if len(p) == 3:
858         type_prefix = p[1]
859         for k in api._schema.schemas:
860             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
861             if type_prefix == obj_class:
862                 return k
863     return None
864
865 def abort(msg, code=1):
866     logger.info("arv-copy: %s", msg)
867     exit(code)
868
869
870 # Code for reporting on the progress of a collection upload.
871 # Stolen from arvados.commands.put.ArvPutCollectionWriter
872 # TODO(twp): figure out how to refactor into a shared library
873 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
874 # code)
875
876 def machine_progress(obj_uuid, bytes_written, bytes_expected):
877     return "{} {}: {} {} written {} total\n".format(
878         sys.argv[0],
879         os.getpid(),
880         obj_uuid,
881         bytes_written,
882         -1 if (bytes_expected is None) else bytes_expected)
883
884 def human_progress(obj_uuid, bytes_written, bytes_expected):
885     if bytes_expected:
886         return "\r{}: {}M / {}M {:.1%} ".format(
887             obj_uuid,
888             bytes_written >> 20, bytes_expected >> 20,
889             float(bytes_written) / bytes_expected)
890     else:
891         return "\r{}: {} ".format(obj_uuid, bytes_written)
892
893 class ProgressWriter(object):
894     _progress_func = None
895     outfile = sys.stderr
896
897     def __init__(self, progress_func):
898         self._progress_func = progress_func
899
900     def report(self, obj_uuid, bytes_written, bytes_expected):
901         if self._progress_func is not None:
902             self.outfile.write(
903                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
904
905     def finish(self):
906         self.outfile.write("\n")
907
908 if __name__ == '__main__':
909     main()