16138: Remove additional pipelines related code
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
55 # directories.
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 #
59 local_repo_dir = {}
60
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
64
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
67
68 # The owner_uuid of the object being copied
69 src_owner_uuid = None
70
71 def main():
72     copy_opts = argparse.ArgumentParser(add_help=False)
73
74     copy_opts.add_argument(
75         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76         help='Print version and exit.')
77     copy_opts.add_argument(
78         '-v', '--verbose', dest='verbose', action='store_true',
79         help='Verbose output.')
80     copy_opts.add_argument(
81         '--progress', dest='progress', action='store_true',
82         help='Report progress on copying collections. (default)')
83     copy_opts.add_argument(
84         '--no-progress', dest='progress', action='store_false',
85         help='Do not report progress on copying collections.')
86     copy_opts.add_argument(
87         '-f', '--force', dest='force', action='store_true',
88         help='Perform copy even if the object appears to exist at the remote destination.')
89     copy_opts.add_argument(
90         '--src', dest='source_arvados', required=True,
91         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92     copy_opts.add_argument(
93         '--dst', dest='destination_arvados', required=True,
94         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--recursive', dest='recursive', action='store_true',
97         help='Recursively copy any dependencies for this object. (default)')
98     copy_opts.add_argument(
99         '--no-recursive', dest='recursive', action='store_false',
100         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
101     copy_opts.add_argument(
102         '--dst-git-repo', dest='dst_git_repo',
103         help='The name of the destination git repository. Required when copying a pipeline recursively.')
104     copy_opts.add_argument(
105         '--project-uuid', dest='project_uuid',
106         help='The UUID of the project at the destination to which the pipeline should be copied.')
107     copy_opts.add_argument(
108         '--allow-git-http-src', action="store_true",
109         help='Allow cloning git repositories over insecure http')
110     copy_opts.add_argument(
111         '--allow-git-http-dst', action="store_true",
112         help='Allow pushing git repositories over insecure http')
113
114     copy_opts.add_argument(
115         'object_uuid',
116         help='The UUID of the object to be copied.')
117     copy_opts.set_defaults(progress=True)
118     copy_opts.set_defaults(recursive=True)
119
120     parser = argparse.ArgumentParser(
121         description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
122         parents=[copy_opts, arv_cmd.retry_opt])
123     args = parser.parse_args()
124
125     if args.verbose:
126         logger.setLevel(logging.DEBUG)
127     else:
128         logger.setLevel(logging.INFO)
129
130     # Create API clients for the source and destination instances
131     src_arv = api_for_instance(args.source_arvados)
132     dst_arv = api_for_instance(args.destination_arvados)
133
134     if not args.project_uuid:
135         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
136
137     # Identify the kind of object we have been given, and begin copying.
138     t = uuid_type(src_arv, args.object_uuid)
139     if t == 'Collection':
140         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
141         result = copy_collection(args.object_uuid,
142                                  src_arv, dst_arv,
143                                  args)
144     elif t == 'Workflow':
145         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
146         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
147     else:
148         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
149
150     # Clean up any outstanding temp git repositories.
151     for d in listvalues(local_repo_dir):
152         shutil.rmtree(d, ignore_errors=True)
153
154     # If no exception was thrown and the response does not have an
155     # error_token field, presume success
156     if 'error_token' in result or 'uuid' not in result:
157         logger.error("API server returned an error result: {}".format(result))
158         exit(1)
159
160     logger.info("")
161     logger.info("Success: created copy with uuid {}".format(result['uuid']))
162     exit(0)
163
164 def set_src_owner_uuid(resource, uuid, args):
165     global src_owner_uuid
166     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
167     src_owner_uuid = c.get("owner_uuid")
168
169 # api_for_instance(instance_name)
170 #
171 #     Creates an API client for the Arvados instance identified by
172 #     instance_name.
173 #
174 #     If instance_name contains a slash, it is presumed to be a path
175 #     (either local or absolute) to a file with Arvados configuration
176 #     settings.
177 #
178 #     Otherwise, it is presumed to be the name of a file in
179 #     $HOME/.config/arvados/instance_name.conf
180 #
181 def api_for_instance(instance_name):
182     if '/' in instance_name:
183         config_file = instance_name
184     else:
185         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
186
187     try:
188         cfg = arvados.config.load(config_file)
189     except (IOError, OSError) as e:
190         abort(("Could not open config file {}: {}\n" +
191                "You must make sure that your configuration tokens\n" +
192                "for Arvados instance {} are in {} and that this\n" +
193                "file is readable.").format(
194                    config_file, e, instance_name, config_file))
195
196     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
197         api_is_insecure = (
198             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
199                 ['1', 't', 'true', 'y', 'yes']))
200         client = arvados.api('v1',
201                              host=cfg['ARVADOS_API_HOST'],
202                              token=cfg['ARVADOS_API_TOKEN'],
203                              insecure=api_is_insecure,
204                              model=OrderedJsonModel())
205     else:
206         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
207     return client
208
209 # Check if git is available
210 def check_git_availability():
211     try:
212         arvados.util.run_command(['git', '--help'])
213     except Exception:
214         abort('git command is not available. Please ensure git is installed.')
215
216
217 def filter_iter(arg):
218     """Iterate a filter string-or-list.
219
220     Pass in a filter field that can either be a string or list.
221     This will iterate elements as if the field had been written as a list.
222     """
223     if isinstance(arg, basestring):
224         return iter((arg,))
225     else:
226         return iter(arg)
227
228 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
229     """Update a single repository filter in-place for the destination.
230
231     If the filter checks that the repository is src_repository, it is
232     updated to check that the repository is dst_repository.  If it does
233     anything else, this function raises ValueError.
234     """
235     if src_repository is None:
236         raise ValueError("component does not specify a source repository")
237     elif dst_repository is None:
238         raise ValueError("no destination repository specified to update repository filter")
239     elif repo_filter[1:] == ['=', src_repository]:
240         repo_filter[2] = dst_repository
241     elif repo_filter[1:] == ['in', [src_repository]]:
242         repo_filter[2] = [dst_repository]
243     else:
244         raise ValueError("repository filter is not a simple source match")
245
246 def migrate_script_version_filter(version_filter):
247     """Update a single script_version filter in-place for the destination.
248
249     Currently this function checks that all the filter operands are Git
250     commit hashes.  If they're not, it raises ValueError to indicate that
251     the filter is not portable.  It could be extended to make other
252     transformations in the future.
253     """
254     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
255         raise ValueError("script_version filter is not limited to commit hashes")
256
257 def attr_filtered(filter_, *attr_names):
258     """Return True if filter_ applies to any of attr_names, else False."""
259     return any((name == 'any') or (name in attr_names)
260                for name in filter_iter(filter_[0]))
261
262 @contextlib.contextmanager
263 def exception_handler(handler, *exc_types):
264     """If any exc_types are raised in the block, call handler on the exception."""
265     try:
266         yield
267     except exc_types as error:
268         handler(error)
269
270
271 # copy_workflow(wf_uuid, src, dst, args)
272 #
273 #    Copies a workflow identified by wf_uuid from src to dst.
274 #
275 #    If args.recursive is True, also copy any collections
276 #      referenced in the workflow definition yaml.
277 #
278 #    The owner_uuid of the new workflow is set to any given
279 #      project_uuid or the user who copied the template.
280 #
281 #    Returns the copied workflow object.
282 #
283 def copy_workflow(wf_uuid, src, dst, args):
284     # fetch the workflow from the source instance
285     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
286
287     # copy collections and docker images
288     if args.recursive:
289         wf_def = yaml.safe_load(wf["definition"])
290         if wf_def is not None:
291             locations = []
292             docker_images = {}
293             graph = wf_def.get('$graph', None)
294             if graph is not None:
295                 workflow_collections(graph, locations, docker_images)
296             else:
297                 workflow_collections(wf_def, locations, docker_images)
298
299             if locations:
300                 copy_collections(locations, src, dst, args)
301
302             for image in docker_images:
303                 copy_docker_image(image, docker_images[image], src, dst, args)
304
305     # copy the workflow itself
306     del wf['uuid']
307     wf['owner_uuid'] = args.project_uuid
308     return dst.workflows().create(body=wf).execute(num_retries=args.retries)
309
310 def workflow_collections(obj, locations, docker_images):
311     if isinstance(obj, dict):
312         loc = obj.get('location', None)
313         if loc is not None:
314             if loc.startswith("keep:"):
315                 locations.append(loc[5:])
316
317         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
318         if docker_image is not None:
319             ds = docker_image.split(":", 1)
320             tag = ds[1] if len(ds)==2 else 'latest'
321             docker_images[ds[0]] = tag
322
323         for x in obj:
324             workflow_collections(obj[x], locations, docker_images)
325     elif isinstance(obj, list):
326         for x in obj:
327             workflow_collections(x, locations, docker_images)
328
329 # copy_collections(obj, src, dst, args)
330 #
331 #    Recursively copies all collections referenced by 'obj' from src
332 #    to dst.  obj may be a dict or a list, in which case we run
333 #    copy_collections on every value it contains. If it is a string,
334 #    search it for any substring that matches a collection hash or uuid
335 #    (this will find hidden references to collections like
336 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
337 #
338 #    Returns a copy of obj with any old collection uuids replaced by
339 #    the new ones.
340 #
341 def copy_collections(obj, src, dst, args):
342
343     def copy_collection_fn(collection_match):
344         """Helper function for regex substitution: copies a single collection,
345         identified by the collection_match MatchObject, to the
346         destination.  Returns the destination collection uuid (or the
347         portable data hash if that's what src_id is).
348
349         """
350         src_id = collection_match.group(0)
351         if src_id not in collections_copied:
352             dst_col = copy_collection(src_id, src, dst, args)
353             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
354                 collections_copied[src_id] = src_id
355             else:
356                 collections_copied[src_id] = dst_col['uuid']
357         return collections_copied[src_id]
358
359     if isinstance(obj, basestring):
360         # Copy any collections identified in this string to dst, replacing
361         # them with the dst uuids as necessary.
362         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
363         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
364         return obj
365     elif isinstance(obj, dict):
366         return type(obj)((v, copy_collections(obj[v], src, dst, args))
367                          for v in obj)
368     elif isinstance(obj, list):
369         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
370     return obj
371
372
373 def total_collection_size(manifest_text):
374     """Return the total number of bytes in this collection (excluding
375     duplicate blocks)."""
376
377     total_bytes = 0
378     locators_seen = {}
379     for line in manifest_text.splitlines():
380         words = line.split()
381         for word in words[1:]:
382             try:
383                 loc = arvados.KeepLocator(word)
384             except ValueError:
385                 continue  # this word isn't a locator, skip it
386             if loc.md5sum not in locators_seen:
387                 locators_seen[loc.md5sum] = True
388                 total_bytes += loc.size
389
390     return total_bytes
391
392 def create_collection_from(c, src, dst, args):
393     """Create a new collection record on dst, and copy Docker metadata if
394     available."""
395
396     collection_uuid = c['uuid']
397     body = {}
398     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
399         body[d] = c[d]
400
401     if not body["name"]:
402         body['name'] = "copied from " + collection_uuid
403
404     body['owner_uuid'] = args.project_uuid
405
406     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
407
408     # Create docker_image_repo+tag and docker_image_hash links
409     # at the destination.
410     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
411         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
412
413         for src_link in docker_links:
414             body = {key: src_link[key]
415                     for key in ['link_class', 'name', 'properties']}
416             body['head_uuid'] = dst_collection['uuid']
417             body['owner_uuid'] = args.project_uuid
418
419             lk = dst.links().create(body=body).execute(num_retries=args.retries)
420             logger.debug('created dst link {}'.format(lk))
421
422     return dst_collection
423
424 # copy_collection(obj_uuid, src, dst, args)
425 #
426 #    Copies the collection identified by obj_uuid from src to dst.
427 #    Returns the collection object created at dst.
428 #
429 #    If args.progress is True, produce a human-friendly progress
430 #    report.
431 #
432 #    If a collection with the desired portable_data_hash already
433 #    exists at dst, and args.force is False, copy_collection returns
434 #    the existing collection without copying any blocks.  Otherwise
435 #    (if no collection exists or if args.force is True)
436 #    copy_collection copies all of the collection data blocks from src
437 #    to dst.
438 #
439 #    For this application, it is critical to preserve the
440 #    collection's manifest hash, which is not guaranteed with the
441 #    arvados.CollectionReader and arvados.CollectionWriter classes.
442 #    Copying each block in the collection manually, followed by
443 #    the manifest block, ensures that the collection's manifest
444 #    hash will not change.
445 #
446 def copy_collection(obj_uuid, src, dst, args):
447     if arvados.util.keep_locator_pattern.match(obj_uuid):
448         # If the obj_uuid is a portable data hash, it might not be
449         # uniquely identified with a particular collection.  As a
450         # result, it is ambiguous as to what name to use for the copy.
451         # Apply some heuristics to pick which collection to get the
452         # name from.
453         srccol = src.collections().list(
454             filters=[['portable_data_hash', '=', obj_uuid]],
455             order="created_at asc"
456             ).execute(num_retries=args.retries)
457
458         items = srccol.get("items")
459
460         if not items:
461             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
462             return
463
464         c = None
465
466         if len(items) == 1:
467             # There's only one collection with the PDH, so use that.
468             c = items[0]
469         if not c:
470             # See if there is a collection that's in the same project
471             # as the root item (usually a pipeline) being copied.
472             for i in items:
473                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
474                     c = i
475                     break
476         if not c:
477             # Didn't find any collections located in the same project, so
478             # pick the oldest collection that has a name assigned to it.
479             for i in items:
480                 if i.get("name"):
481                     c = i
482                     break
483         if not c:
484             # None of the collections have names (?!), so just pick the
485             # first one.
486             c = items[0]
487
488         # list() doesn't return manifest text (and we don't want it to,
489         # because we don't need the same maninfest text sent to us 50
490         # times) so go and retrieve the collection object directly
491         # which will include the manifest text.
492         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
493     else:
494         # Assume this is an actual collection uuid, so fetch it directly.
495         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
496
497     # If a collection with this hash already exists at the
498     # destination, and 'force' is not true, just return that
499     # collection.
500     if not args.force:
501         if 'portable_data_hash' in c:
502             colhash = c['portable_data_hash']
503         else:
504             colhash = c['uuid']
505         dstcol = dst.collections().list(
506             filters=[['portable_data_hash', '=', colhash]]
507         ).execute(num_retries=args.retries)
508         if dstcol['items_available'] > 0:
509             for d in dstcol['items']:
510                 if ((args.project_uuid == d['owner_uuid']) and
511                     (c.get('name') == d['name']) and
512                     (c['portable_data_hash'] == d['portable_data_hash'])):
513                     return d
514             c['manifest_text'] = dst.collections().get(
515                 uuid=dstcol['items'][0]['uuid']
516             ).execute(num_retries=args.retries)['manifest_text']
517             return create_collection_from(c, src, dst, args)
518
519     # Fetch the collection's manifest.
520     manifest = c['manifest_text']
521     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
522
523     # Copy each block from src_keep to dst_keep.
524     # Use the newly signed locators returned from dst_keep to build
525     # a new manifest as we go.
526     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
527     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
528     dst_manifest = ""
529     dst_locators = {}
530     bytes_written = 0
531     bytes_expected = total_collection_size(manifest)
532     if args.progress:
533         progress_writer = ProgressWriter(human_progress)
534     else:
535         progress_writer = None
536
537     for line in manifest.splitlines():
538         words = line.split()
539         dst_manifest += words[0]
540         for word in words[1:]:
541             try:
542                 loc = arvados.KeepLocator(word)
543             except ValueError:
544                 # If 'word' can't be parsed as a locator,
545                 # presume it's a filename.
546                 dst_manifest += ' ' + word
547                 continue
548             blockhash = loc.md5sum
549             # copy this block if we haven't seen it before
550             # (otherwise, just reuse the existing dst_locator)
551             if blockhash not in dst_locators:
552                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
553                 if progress_writer:
554                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
555                 data = src_keep.get(word)
556                 dst_locator = dst_keep.put(data)
557                 dst_locators[blockhash] = dst_locator
558                 bytes_written += loc.size
559             dst_manifest += ' ' + dst_locators[blockhash]
560         dst_manifest += "\n"
561
562     if progress_writer:
563         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
564         progress_writer.finish()
565
566     # Copy the manifest and save the collection.
567     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
568
569     c['manifest_text'] = dst_manifest
570     return create_collection_from(c, src, dst, args)
571
572 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
573     r = api.repositories().list(
574         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
575     if r['items_available'] != 1:
576         raise Exception('cannot identify repo {}; {} repos found'
577                         .format(repo_name, r['items_available']))
578
579     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
580     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
581     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
582
583     priority = https_url + other_url + http_url
584
585     git_config = []
586     git_url = None
587     for url in priority:
588         if url.startswith("http"):
589             u = urllib.parse.urlsplit(url)
590             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
591             git_config = ["-c", "credential.%s/.username=none" % baseurl,
592                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
593         else:
594             git_config = []
595
596         try:
597             logger.debug("trying %s", url)
598             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
599                                       env={"HOME": os.environ["HOME"],
600                                            "ARVADOS_API_TOKEN": api.api_token,
601                                            "GIT_ASKPASS": "/bin/false"})
602         except arvados.errors.CommandFailedError:
603             pass
604         else:
605             git_url = url
606             break
607
608     if not git_url:
609         raise Exception('Cannot access git repository, tried {}'
610                         .format(priority))
611
612     if git_url.startswith("http:"):
613         if allow_insecure_http:
614             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
615         else:
616             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
617
618     return (git_url, git_config)
619
620
621 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
622 #
623 #    Copies commits from git repository 'src_git_repo' on Arvados
624 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
625 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
626 #    or "jsmith")
627 #
628 #    All commits will be copied to a destination branch named for the
629 #    source repository URL.
630 #
631 #    The destination repository must already exist.
632 #
633 #    The user running this command must be authenticated
634 #    to both repositories.
635 #
636 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
637     # Identify the fetch and push URLs for the git repositories.
638
639     (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
640     (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
641
642     logger.debug('src_git_url: {}'.format(src_git_url))
643     logger.debug('dst_git_url: {}'.format(dst_git_url))
644
645     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
646
647     # Copy git commits from src repo to dst repo.
648     if src_git_repo not in local_repo_dir:
649         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
650         arvados.util.run_command(
651             ["git"] + src_git_config + ["clone", "--bare", src_git_url,
652              local_repo_dir[src_git_repo]],
653             cwd=os.path.dirname(local_repo_dir[src_git_repo]),
654             env={"HOME": os.environ["HOME"],
655                  "ARVADOS_API_TOKEN": src.api_token,
656                  "GIT_ASKPASS": "/bin/false"})
657         arvados.util.run_command(
658             ["git", "remote", "add", "dst", dst_git_url],
659             cwd=local_repo_dir[src_git_repo])
660     arvados.util.run_command(
661         ["git", "branch", dst_branch, script_version],
662         cwd=local_repo_dir[src_git_repo])
663     arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
664                              cwd=local_repo_dir[src_git_repo],
665                              env={"HOME": os.environ["HOME"],
666                                   "ARVADOS_API_TOKEN": dst.api_token,
667                                   "GIT_ASKPASS": "/bin/false"})
668
669
670 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
671     """Copy the docker image identified by docker_image and
672     docker_image_tag from src to dst. Create appropriate
673     docker_image_repo+tag and docker_image_hash links at dst.
674
675     """
676
677     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
678
679     # Find the link identifying this docker image.
680     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
681         src, args.retries, docker_image, docker_image_tag)
682     if docker_image_list:
683         image_uuid, image_info = docker_image_list[0]
684         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
685
686         # Copy the collection it refers to.
687         dst_image_col = copy_collection(image_uuid, src, dst, args)
688     elif arvados.util.keep_locator_pattern.match(docker_image):
689         dst_image_col = copy_collection(docker_image, src, dst, args)
690     else:
691         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
692
693 # git_rev_parse(rev, repo)
694 #
695 #    Returns the 40-character commit hash corresponding to 'rev' in
696 #    git repository 'repo' (which must be the path of a local git
697 #    repository)
698 #
699 def git_rev_parse(rev, repo):
700     gitout, giterr = arvados.util.run_command(
701         ['git', 'rev-parse', rev], cwd=repo)
702     return gitout.strip()
703
704 # uuid_type(api, object_uuid)
705 #
706 #    Returns the name of the class that object_uuid belongs to, based on
707 #    the second field of the uuid.  This function consults the api's
708 #    schema to identify the object class.
709 #
710 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
711 #
712 #    Special case: if handed a Keep locator hash, return 'Collection'.
713 #
714 def uuid_type(api, object_uuid):
715     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
716         return 'Collection'
717     p = object_uuid.split('-')
718     if len(p) == 3:
719         type_prefix = p[1]
720         for k in api._schema.schemas:
721             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
722             if type_prefix == obj_class:
723                 return k
724     return None
725
726 def abort(msg, code=1):
727     logger.info("arv-copy: %s", msg)
728     exit(code)
729
730
731 # Code for reporting on the progress of a collection upload.
732 # Stolen from arvados.commands.put.ArvPutCollectionWriter
733 # TODO(twp): figure out how to refactor into a shared library
734 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
735 # code)
736
737 def machine_progress(obj_uuid, bytes_written, bytes_expected):
738     return "{} {}: {} {} written {} total\n".format(
739         sys.argv[0],
740         os.getpid(),
741         obj_uuid,
742         bytes_written,
743         -1 if (bytes_expected is None) else bytes_expected)
744
745 def human_progress(obj_uuid, bytes_written, bytes_expected):
746     if bytes_expected:
747         return "\r{}: {}M / {}M {:.1%} ".format(
748             obj_uuid,
749             bytes_written >> 20, bytes_expected >> 20,
750             float(bytes_written) / bytes_expected)
751     else:
752         return "\r{}: {} ".format(obj_uuid, bytes_written)
753
754 class ProgressWriter(object):
755     _progress_func = None
756     outfile = sys.stderr
757
758     def __init__(self, progress_func):
759         self._progress_func = progress_func
760
761     def report(self, obj_uuid, bytes_written, bytes_expected):
762         if self._progress_func is not None:
763             self.outfile.write(
764                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
765
766     def finish(self):
767         self.outfile.write("\n")
768
769 if __name__ == '__main__':
770     main()