16138: whitelist fields for arv-copy
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a pipeline instance, arv-copy copies the pipeline
12 # template, input collection, docker images, git repositories). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
55 # directories.
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 #
59 local_repo_dir = {}
60
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
64
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
67
68 # The owner_uuid of the object being copied
69 src_owner_uuid = None
70
71 def main():
72     copy_opts = argparse.ArgumentParser(add_help=False)
73
74     copy_opts.add_argument(
75         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76         help='Print version and exit.')
77     copy_opts.add_argument(
78         '-v', '--verbose', dest='verbose', action='store_true',
79         help='Verbose output.')
80     copy_opts.add_argument(
81         '--progress', dest='progress', action='store_true',
82         help='Report progress on copying collections. (default)')
83     copy_opts.add_argument(
84         '--no-progress', dest='progress', action='store_false',
85         help='Do not report progress on copying collections.')
86     copy_opts.add_argument(
87         '-f', '--force', dest='force', action='store_true',
88         help='Perform copy even if the object appears to exist at the remote destination.')
89     copy_opts.add_argument(
90         '--force-filters', action='store_true', default=False,
91         help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
92     copy_opts.add_argument(
93         '--src', dest='source_arvados', required=True,
94         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--dst', dest='destination_arvados', required=True,
97         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
98     copy_opts.add_argument(
99         '--recursive', dest='recursive', action='store_true',
100         help='Recursively copy any dependencies for this object. (default)')
101     copy_opts.add_argument(
102         '--no-recursive', dest='recursive', action='store_false',
103         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
104     copy_opts.add_argument(
105         '--dst-git-repo', dest='dst_git_repo',
106         help='The name of the destination git repository. Required when copying a pipeline recursively.')
107     copy_opts.add_argument(
108         '--project-uuid', dest='project_uuid',
109         help='The UUID of the project at the destination to which the pipeline should be copied.')
110     copy_opts.add_argument(
111         '--allow-git-http-src', action="store_true",
112         help='Allow cloning git repositories over insecure http')
113     copy_opts.add_argument(
114         '--allow-git-http-dst', action="store_true",
115         help='Allow pushing git repositories over insecure http')
116
117     copy_opts.add_argument(
118         'object_uuid',
119         help='The UUID of the object to be copied.')
120     copy_opts.set_defaults(progress=True)
121     copy_opts.set_defaults(recursive=True)
122
123     parser = argparse.ArgumentParser(
124         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
125         parents=[copy_opts, arv_cmd.retry_opt])
126     args = parser.parse_args()
127
128     if args.verbose:
129         logger.setLevel(logging.DEBUG)
130     else:
131         logger.setLevel(logging.INFO)
132
133     # Create API clients for the source and destination instances
134     src_arv = api_for_instance(args.source_arvados)
135     dst_arv = api_for_instance(args.destination_arvados)
136
137     if not args.project_uuid:
138         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
139
140     # Identify the kind of object we have been given, and begin copying.
141     t = uuid_type(src_arv, args.object_uuid)
142     if t == 'Collection':
143         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
144         result = copy_collection(args.object_uuid,
145                                  src_arv, dst_arv,
146                                  args)
147     elif t == 'Workflow':
148         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
149         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
150     else:
151         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
152
153     # Clean up any outstanding temp git repositories.
154     for d in listvalues(local_repo_dir):
155         shutil.rmtree(d, ignore_errors=True)
156
157     # If no exception was thrown and the response does not have an
158     # error_token field, presume success
159     if 'error_token' in result or 'uuid' not in result:
160         logger.error("API server returned an error result: {}".format(result))
161         exit(1)
162
163     logger.info("")
164     logger.info("Success: created copy with uuid {}".format(result['uuid']))
165     exit(0)
166
167 def set_src_owner_uuid(resource, uuid, args):
168     global src_owner_uuid
169     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
170     src_owner_uuid = c.get("owner_uuid")
171
172 # api_for_instance(instance_name)
173 #
174 #     Creates an API client for the Arvados instance identified by
175 #     instance_name.
176 #
177 #     If instance_name contains a slash, it is presumed to be a path
178 #     (either local or absolute) to a file with Arvados configuration
179 #     settings.
180 #
181 #     Otherwise, it is presumed to be the name of a file in
182 #     $HOME/.config/arvados/instance_name.conf
183 #
184 def api_for_instance(instance_name):
185     if '/' in instance_name:
186         config_file = instance_name
187     else:
188         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
189
190     try:
191         cfg = arvados.config.load(config_file)
192     except (IOError, OSError) as e:
193         abort(("Could not open config file {}: {}\n" +
194                "You must make sure that your configuration tokens\n" +
195                "for Arvados instance {} are in {} and that this\n" +
196                "file is readable.").format(
197                    config_file, e, instance_name, config_file))
198
199     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
200         api_is_insecure = (
201             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
202                 ['1', 't', 'true', 'y', 'yes']))
203         client = arvados.api('v1',
204                              host=cfg['ARVADOS_API_HOST'],
205                              token=cfg['ARVADOS_API_TOKEN'],
206                              insecure=api_is_insecure,
207                              model=OrderedJsonModel())
208     else:
209         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
210     return client
211
212 # Check if git is available
213 def check_git_availability():
214     try:
215         arvados.util.run_command(['git', '--help'])
216     except Exception:
217         abort('git command is not available. Please ensure git is installed.')
218
219
220 def filter_iter(arg):
221     """Iterate a filter string-or-list.
222
223     Pass in a filter field that can either be a string or list.
224     This will iterate elements as if the field had been written as a list.
225     """
226     if isinstance(arg, basestring):
227         return iter((arg,))
228     else:
229         return iter(arg)
230
231 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
232     """Update a single repository filter in-place for the destination.
233
234     If the filter checks that the repository is src_repository, it is
235     updated to check that the repository is dst_repository.  If it does
236     anything else, this function raises ValueError.
237     """
238     if src_repository is None:
239         raise ValueError("component does not specify a source repository")
240     elif dst_repository is None:
241         raise ValueError("no destination repository specified to update repository filter")
242     elif repo_filter[1:] == ['=', src_repository]:
243         repo_filter[2] = dst_repository
244     elif repo_filter[1:] == ['in', [src_repository]]:
245         repo_filter[2] = [dst_repository]
246     else:
247         raise ValueError("repository filter is not a simple source match")
248
249 def migrate_script_version_filter(version_filter):
250     """Update a single script_version filter in-place for the destination.
251
252     Currently this function checks that all the filter operands are Git
253     commit hashes.  If they're not, it raises ValueError to indicate that
254     the filter is not portable.  It could be extended to make other
255     transformations in the future.
256     """
257     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
258         raise ValueError("script_version filter is not limited to commit hashes")
259
260 def attr_filtered(filter_, *attr_names):
261     """Return True if filter_ applies to any of attr_names, else False."""
262     return any((name == 'any') or (name in attr_names)
263                for name in filter_iter(filter_[0]))
264
265 @contextlib.contextmanager
266 def exception_handler(handler, *exc_types):
267     """If any exc_types are raised in the block, call handler on the exception."""
268     try:
269         yield
270     except exc_types as error:
271         handler(error)
272
273 def migrate_components_filters(template_components, dst_git_repo):
274     """Update template component filters in-place for the destination.
275
276     template_components is a dictionary of components in a pipeline template.
277     This method walks over each component's filters, and updates them to have
278     identical semantics on the destination cluster.  It returns a list of
279     error strings that describe what filters could not be updated safely.
280
281     dst_git_repo is the name of the destination Git repository, which can
282     be None if that is not known.
283     """
284     errors = []
285     for cname, cspec in template_components.items():
286         def add_error(errmsg):
287             errors.append("{}: {}".format(cname, errmsg))
288         if not isinstance(cspec, dict):
289             add_error("value is not a component definition")
290             continue
291         src_repository = cspec.get('repository')
292         filters = cspec.get('filters', [])
293         if not isinstance(filters, list):
294             add_error("filters are not a list")
295             continue
296         for cfilter in filters:
297             if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
298                 add_error("malformed filter {!r}".format(cfilter))
299                 continue
300             if attr_filtered(cfilter, 'repository'):
301                 with exception_handler(add_error, ValueError):
302                     migrate_repository_filter(cfilter, src_repository, dst_git_repo)
303             if attr_filtered(cfilter, 'script_version'):
304                 with exception_handler(add_error, ValueError):
305                     migrate_script_version_filter(cfilter)
306     return errors
307
308
309 # copy_workflow(wf_uuid, src, dst, args)
310 #
311 #    Copies a workflow identified by wf_uuid from src to dst.
312 #
313 #    If args.recursive is True, also copy any collections
314 #      referenced in the workflow definition yaml.
315 #
316 #    The owner_uuid of the new workflow is set to any given
317 #      project_uuid or the user who copied the template.
318 #
319 #    Returns the copied workflow object.
320 #
321 def copy_workflow(wf_uuid, src, dst, args):
322     # fetch the workflow from the source instance
323     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
324
325     # copy collections and docker images
326     if args.recursive:
327         wf_def = yaml.safe_load(wf["definition"])
328         if wf_def is not None:
329             locations = []
330             docker_images = {}
331             graph = wf_def.get('$graph', None)
332             if graph is not None:
333                 workflow_collections(graph, locations, docker_images)
334             else:
335                 workflow_collections(wf_def, locations, docker_images)
336
337             if locations:
338                 copy_collections(locations, src, dst, args)
339
340             for image in docker_images:
341                 copy_docker_image(image, docker_images[image], src, dst, args)
342
343     # copy the workflow itself
344     del wf['uuid']
345     wf['owner_uuid'] = args.project_uuid
346     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
347
348 def workflow_collections(obj, locations, docker_images):
349     if isinstance(obj, dict):
350         loc = obj.get('location', None)
351         if loc is not None:
352             if loc.startswith("keep:"):
353                 locations.append(loc[5:])
354
355         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
356         if docker_image is not None:
357             ds = docker_image.split(":", 1)
358             tag = ds[1] if len(ds)==2 else 'latest'
359             docker_images[ds[0]] = tag
360
361         for x in obj:
362             workflow_collections(obj[x], locations, docker_images)
363     elif isinstance(obj, list):
364         for x in obj:
365             workflow_collections(x, locations, docker_images)
366
367 # copy_collections(obj, src, dst, args)
368 #
369 #    Recursively copies all collections referenced by 'obj' from src
370 #    to dst.  obj may be a dict or a list, in which case we run
371 #    copy_collections on every value it contains. If it is a string,
372 #    search it for any substring that matches a collection hash or uuid
373 #    (this will find hidden references to collections like
374 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
375 #
376 #    Returns a copy of obj with any old collection uuids replaced by
377 #    the new ones.
378 #
379 def copy_collections(obj, src, dst, args):
380
381     def copy_collection_fn(collection_match):
382         """Helper function for regex substitution: copies a single collection,
383         identified by the collection_match MatchObject, to the
384         destination.  Returns the destination collection uuid (or the
385         portable data hash if that's what src_id is).
386
387         """
388         src_id = collection_match.group(0)
389         if src_id not in collections_copied:
390             dst_col = copy_collection(src_id, src, dst, args)
391             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
392                 collections_copied[src_id] = src_id
393             else:
394                 collections_copied[src_id] = dst_col['uuid']
395         return collections_copied[src_id]
396
397     if isinstance(obj, basestring):
398         # Copy any collections identified in this string to dst, replacing
399         # them with the dst uuids as necessary.
400         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
401         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
402         return obj
403     elif isinstance(obj, dict):
404         return type(obj)((v, copy_collections(obj[v], src, dst, args))
405                          for v in obj)
406     elif isinstance(obj, list):
407         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
408     return obj
409
410 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
411     """Copy a job's script to the destination repository, and update its record.
412
413     Given a jobspec dictionary, this function finds the referenced script from
414     src and copies it to dst and dst_repo.  It also updates jobspec in place to
415     refer to names on the destination.
416     """
417     repo = jobspec.get('repository')
418     if repo is None:
419         return
420     # script_version is the "script_version" parameter from the source
421     # component or job.  If no script_version was supplied in the
422     # component or job, it is a mistake in the pipeline, but for the
423     # purposes of copying the repository, default to "master".
424     script_version = jobspec.get('script_version') or 'master'
425     script_key = (repo, script_version)
426     if script_key not in scripts_copied:
427         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
428         scripts_copied.add(script_key)
429     jobspec['repository'] = dst_repo
430     repo_dir = local_repo_dir[repo]
431     for version_key in ['script_version', 'supplied_script_version']:
432         if version_key in jobspec:
433             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
434
435 # copy_git_repos(p, src, dst, dst_repo, args)
436 #
437 #    Copies all git repositories referenced by pipeline instance or
438 #    template 'p' from src to dst.
439 #
440 #    For each component c in the pipeline:
441 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
442 #      * Rename script versions:
443 #          * c['script_version']
444 #          * c['job']['script_version']
445 #          * c['job']['supplied_script_version']
446 #        to the commit hashes they resolve to, since any symbolic
447 #        names (tags, branches) are not preserved in the destination repo.
448 #
449 #    The pipeline object is updated in place with the new repository
450 #    names.  The return value is undefined.
451 #
452 def copy_git_repos(p, src, dst, dst_repo, args):
453     for component in p['components'].values():
454         migrate_jobspec(component, src, dst, dst_repo, args)
455         if 'job' in component:
456             migrate_jobspec(component['job'], src, dst, dst_repo, args)
457
458 def total_collection_size(manifest_text):
459     """Return the total number of bytes in this collection (excluding
460     duplicate blocks)."""
461
462     total_bytes = 0
463     locators_seen = {}
464     for line in manifest_text.splitlines():
465         words = line.split()
466         for word in words[1:]:
467             try:
468                 loc = arvados.KeepLocator(word)
469             except ValueError:
470                 continue  # this word isn't a locator, skip it
471             if loc.md5sum not in locators_seen:
472                 locators_seen[loc.md5sum] = True
473                 total_bytes += loc.size
474
475     return total_bytes
476
477 def create_collection_from(c, src, dst, args):
478     """Create a new collection record on dst, and copy Docker metadata if
479     available."""
480
481     collection_uuid = c['uuid']
482     body = {}
483     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
484         body[d] = c[d]
485
486     if not body["name"]:
487         body['name'] = "copied from " + collection_uuid
488
489     body['owner_uuid'] = args.project_uuid
490
491     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
492
493     # Create docker_image_repo+tag and docker_image_hash links
494     # at the destination.
495     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
496         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
497
498         for src_link in docker_links:
499             body = {key: src_link[key]
500                     for key in ['link_class', 'name', 'properties']}
501             body['head_uuid'] = dst_collection['uuid']
502             body['owner_uuid'] = args.project_uuid
503
504             lk = dst.links().create(body=body).execute(num_retries=args.retries)
505             logger.debug('created dst link {}'.format(lk))
506
507     return dst_collection
508
509 # copy_collection(obj_uuid, src, dst, args)
510 #
511 #    Copies the collection identified by obj_uuid from src to dst.
512 #    Returns the collection object created at dst.
513 #
514 #    If args.progress is True, produce a human-friendly progress
515 #    report.
516 #
517 #    If a collection with the desired portable_data_hash already
518 #    exists at dst, and args.force is False, copy_collection returns
519 #    the existing collection without copying any blocks.  Otherwise
520 #    (if no collection exists or if args.force is True)
521 #    copy_collection copies all of the collection data blocks from src
522 #    to dst.
523 #
524 #    For this application, it is critical to preserve the
525 #    collection's manifest hash, which is not guaranteed with the
526 #    arvados.CollectionReader and arvados.CollectionWriter classes.
527 #    Copying each block in the collection manually, followed by
528 #    the manifest block, ensures that the collection's manifest
529 #    hash will not change.
530 #
531 def copy_collection(obj_uuid, src, dst, args):
532     if arvados.util.keep_locator_pattern.match(obj_uuid):
533         # If the obj_uuid is a portable data hash, it might not be
534         # uniquely identified with a particular collection.  As a
535         # result, it is ambiguous as to what name to use for the copy.
536         # Apply some heuristics to pick which collection to get the
537         # name from.
538         srccol = src.collections().list(
539             filters=[['portable_data_hash', '=', obj_uuid]],
540             order="created_at asc"
541             ).execute(num_retries=args.retries)
542
543         items = srccol.get("items")
544
545         if not items:
546             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
547             return
548
549         c = None
550
551         if len(items) == 1:
552             # There's only one collection with the PDH, so use that.
553             c = items[0]
554         if not c:
555             # See if there is a collection that's in the same project
556             # as the root item (usually a pipeline) being copied.
557             for i in items:
558                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
559                     c = i
560                     break
561         if not c:
562             # Didn't find any collections located in the same project, so
563             # pick the oldest collection that has a name assigned to it.
564             for i in items:
565                 if i.get("name"):
566                     c = i
567                     break
568         if not c:
569             # None of the collections have names (?!), so just pick the
570             # first one.
571             c = items[0]
572
573         # list() doesn't return manifest text (and we don't want it to,
574         # because we don't need the same maninfest text sent to us 50
575         # times) so go and retrieve the collection object directly
576         # which will include the manifest text.
577         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
578     else:
579         # Assume this is an actual collection uuid, so fetch it directly.
580         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
581
582     # If a collection with this hash already exists at the
583     # destination, and 'force' is not true, just return that
584     # collection.
585     if not args.force:
586         if 'portable_data_hash' in c:
587             colhash = c['portable_data_hash']
588         else:
589             colhash = c['uuid']
590         dstcol = dst.collections().list(
591             filters=[['portable_data_hash', '=', colhash]]
592         ).execute(num_retries=args.retries)
593         if dstcol['items_available'] > 0:
594             for d in dstcol['items']:
595                 if ((args.project_uuid == d['owner_uuid']) and
596                     (c.get('name') == d['name']) and
597                     (c['portable_data_hash'] == d['portable_data_hash'])):
598                     return d
599             c['manifest_text'] = dst.collections().get(
600                 uuid=dstcol['items'][0]['uuid']
601             ).execute(num_retries=args.retries)['manifest_text']
602             return create_collection_from(c, src, dst, args)
603
604     # Fetch the collection's manifest.
605     manifest = c['manifest_text']
606     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
607
608     # Copy each block from src_keep to dst_keep.
609     # Use the newly signed locators returned from dst_keep to build
610     # a new manifest as we go.
611     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
612     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
613     dst_manifest = ""
614     dst_locators = {}
615     bytes_written = 0
616     bytes_expected = total_collection_size(manifest)
617     if args.progress:
618         progress_writer = ProgressWriter(human_progress)
619     else:
620         progress_writer = None
621
622     for line in manifest.splitlines():
623         words = line.split()
624         dst_manifest += words[0]
625         for word in words[1:]:
626             try:
627                 loc = arvados.KeepLocator(word)
628             except ValueError:
629                 # If 'word' can't be parsed as a locator,
630                 # presume it's a filename.
631                 dst_manifest += ' ' + word
632                 continue
633             blockhash = loc.md5sum
634             # copy this block if we haven't seen it before
635             # (otherwise, just reuse the existing dst_locator)
636             if blockhash not in dst_locators:
637                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
638                 if progress_writer:
639                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
640                 data = src_keep.get(word)
641                 dst_locator = dst_keep.put(data)
642                 dst_locators[blockhash] = dst_locator
643                 bytes_written += loc.size
644             dst_manifest += ' ' + dst_locators[blockhash]
645         dst_manifest += "\n"
646
647     if progress_writer:
648         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
649         progress_writer.finish()
650
651     # Copy the manifest and save the collection.
652     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
653
654     c['manifest_text'] = dst_manifest
655     return create_collection_from(c, src, dst, args)
656
657 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
658     r = api.repositories().list(
659         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
660     if r['items_available'] != 1:
661         raise Exception('cannot identify repo {}; {} repos found'
662                         .format(repo_name, r['items_available']))
663
664     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
665     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
666     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
667
668     priority = https_url + other_url + http_url
669
670     git_config = []
671     git_url = None
672     for url in priority:
673         if url.startswith("http"):
674             u = urllib.parse.urlsplit(url)
675             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
676             git_config = ["-c", "credential.%s/.username=none" % baseurl,
677                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
678         else:
679             git_config = []
680
681         try:
682             logger.debug("trying %s", url)
683             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
684                                       env={"HOME": os.environ["HOME"],
685                                            "ARVADOS_API_TOKEN": api.api_token,
686                                            "GIT_ASKPASS": "/bin/false"})
687         except arvados.errors.CommandFailedError:
688             pass
689         else:
690             git_url = url
691             break
692
693     if not git_url:
694         raise Exception('Cannot access git repository, tried {}'
695                         .format(priority))
696
697     if git_url.startswith("http:"):
698         if allow_insecure_http:
699             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
700         else:
701             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
702
703     return (git_url, git_config)
704
705
706 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
707 #
708 #    Copies commits from git repository 'src_git_repo' on Arvados
709 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
710 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
711 #    or "jsmith")
712 #
713 #    All commits will be copied to a destination branch named for the
714 #    source repository URL.
715 #
716 #    The destination repository must already exist.
717 #
718 #    The user running this command must be authenticated
719 #    to both repositories.
720 #
721 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
722     # Identify the fetch and push URLs for the git repositories.
723
724     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
725     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
726
727     logger.debug('src_git_url: {}'.format(src_git_url))
728     logger.debug('dst_git_url: {}'.format(dst_git_url))
729
730     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
731
732     # Copy git commits from src repo to dst repo.
733     if src_git_repo not in local_repo_dir:
734         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
735         arvados.util.run_command(
736             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
737              local_repo_dir[src_git_repo]],
738             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
739             env={"HOME": os.environ["HOME"],
740                  "ARVADOS_API_TOKEN": src.api_token,
741                  "GIT_ASKPASS": "/bin/false"})
742         arvados.util.run_command(
743             ["git", "remote", "add", "dst", dst_git_url],
744             cwd=local_repo_dir[src_git_repo])
745     arvados.util.run_command(
746         ["git", "branch", dst_branch, script_version],
747         cwd=local_repo_dir[src_git_repo])
748     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
749                              cwd=local_repo_dir[src_git_repo],
750                              env={"HOME": os.environ["HOME"],
751                                   "ARVADOS_API_TOKEN": dst.api_token,
752                                   "GIT_ASKPASS": "/bin/false"})
753
754 def copy_docker_images(pipeline, src, dst, args):
755     """Copy any docker images named in the pipeline components'
756     runtime_constraints field from src to dst."""
757
758     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
759     for c_name, c_info in pipeline['components'].items():
760         if ('runtime_constraints' in c_info and
761             'docker_image' in c_info['runtime_constraints']):
762             copy_docker_image(
763                 c_info['runtime_constraints']['docker_image'],
764                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
765                 src, dst, args)
766
767
768 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
769     """Copy the docker image identified by docker_image and
770     docker_image_tag from src to dst. Create appropriate
771     docker_image_repo+tag and docker_image_hash links at dst.
772
773     """
774
775     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
776
777     # Find the link identifying this docker image.
778     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
779         src, args.retries, docker_image, docker_image_tag)
780     if docker_image_list:
781         image_uuid, image_info = docker_image_list[0]
782         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
783
784         # Copy the collection it refers to.
785         dst_image_col = copy_collection(image_uuid, src, dst, args)
786     elif arvados.util.keep_locator_pattern.match(docker_image):
787         dst_image_col = copy_collection(docker_image, src, dst, args)
788     else:
789         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
790
791 # git_rev_parse(rev, repo)
792 #
793 #    Returns the 40-character commit hash corresponding to 'rev' in
794 #    git repository 'repo' (which must be the path of a local git
795 #    repository)
796 #
797 def git_rev_parse(rev, repo):
798     gitout, giterr = arvados.util.run_command(
799         ['git', 'rev-parse', rev], cwd=repo)
800     return gitout.strip()
801
802 # uuid_type(api, object_uuid)
803 #
804 #    Returns the name of the class that object_uuid belongs to, based on
805 #    the second field of the uuid.  This function consults the api's
806 #    schema to identify the object class.
807 #
808 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
809 #
810 #    Special case: if handed a Keep locator hash, return 'Collection'.
811 #
812 def uuid_type(api, object_uuid):
813     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
814         return 'Collection'
815     p = object_uuid.split('-')
816     if len(p) == 3:
817         type_prefix = p[1]
818         for k in api._schema.schemas:
819             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
820             if type_prefix == obj_class:
821                 return k
822     return None
823
824 def abort(msg, code=1):
825     logger.info("arv-copy: %s", msg)
826     exit(code)
827
828
829 # Code for reporting on the progress of a collection upload.
830 # Stolen from arvados.commands.put.ArvPutCollectionWriter
831 # TODO(twp): figure out how to refactor into a shared library
832 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
833 # code)
834
835 def machine_progress(obj_uuid, bytes_written, bytes_expected):
836     return "{} {}: {} {} written {} total\n".format(
837         sys.argv[0],
838         os.getpid(),
839         obj_uuid,
840         bytes_written,
841         -1 if (bytes_expected is None) else bytes_expected)
842
843 def human_progress(obj_uuid, bytes_written, bytes_expected):
844     if bytes_expected:
845         return "\r{}: {}M / {}M {:.1%} ".format(
846             obj_uuid,
847             bytes_written >> 20, bytes_expected >> 20,
848             float(bytes_written) / bytes_expected)
849     else:
850         return "\r{}: {} ".format(obj_uuid, bytes_written)
851
852 class ProgressWriter(object):
853     _progress_func = None
854     outfile = sys.stderr
855
856     def __init__(self, progress_func):
857         self._progress_func = progress_func
858
859     def report(self, obj_uuid, bytes_written, bytes_expected):
860         if self._progress_func is not None:
861             self.outfile.write(
862                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
863
864     def finish(self):
865         self.outfile.write("\n")
866
867 if __name__ == '__main__':
868     main()