93fd6b598aefab0448e450391beecccbb2419f42
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37 import io
38
39 import arvados
40 import arvados.config
41 import arvados.keep
42 import arvados.util
43 import arvados.commands._util as arv_cmd
44 import arvados.commands.keepdocker
45 import ruamel.yaml as yaml
46
47 from arvados.api import OrderedJsonModel
48 from arvados._version import __version__
49
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51
52 logger = logging.getLogger('arvados.arv-copy')
53
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
56 # directories.
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 #
60 local_repo_dir = {}
61
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
65
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
68
69 # The owner_uuid of the object being copied
70 src_owner_uuid = None
71
72 def main():
73     copy_opts = argparse.ArgumentParser(add_help=False)
74
75     copy_opts.add_argument(
76         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77         help='Print version and exit.')
78     copy_opts.add_argument(
79         '-v', '--verbose', dest='verbose', action='store_true',
80         help='Verbose output.')
81     copy_opts.add_argument(
82         '--progress', dest='progress', action='store_true',
83         help='Report progress on copying collections. (default)')
84     copy_opts.add_argument(
85         '--no-progress', dest='progress', action='store_false',
86         help='Do not report progress on copying collections.')
87     copy_opts.add_argument(
88         '-f', '--force', dest='force', action='store_true',
89         help='Perform copy even if the object appears to exist at the remote destination.')
90     copy_opts.add_argument(
91         '--src', dest='source_arvados',
92         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados',
95         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object, and subprojects. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies or subprojects.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105
106     copy_opts.add_argument(
107         'object_uuid',
108         help='The UUID of the object to be copied.')
109     copy_opts.set_defaults(progress=True)
110     copy_opts.set_defaults(recursive=True)
111
112     parser = argparse.ArgumentParser(
113         description='Copy a workflow or collection from one Arvados instance to another.',
114         parents=[copy_opts, arv_cmd.retry_opt])
115     args = parser.parse_args()
116
117     if args.verbose:
118         logger.setLevel(logging.DEBUG)
119     else:
120         logger.setLevel(logging.INFO)
121
122     if not args.source_arvados:
123         args.source_arvados = args.object_uuid[:5]
124
125     # Create API clients for the source and destination instances
126     src_arv = api_for_instance(args.source_arvados)
127     dst_arv = api_for_instance(args.destination_arvados)
128
129     if not args.project_uuid:
130         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
131
132     # Identify the kind of object we have been given, and begin copying.
133     t = uuid_type(src_arv, args.object_uuid)
134     if t == 'Collection':
135         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
136         result = copy_collection(args.object_uuid,
137                                  src_arv, dst_arv,
138                                  args)
139     elif t == 'Workflow':
140         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
141         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
142     elif t == 'Group':
143         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
144         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
145     else:
146         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
147
148     # Clean up any outstanding temp git repositories.
149     for d in listvalues(local_repo_dir):
150         shutil.rmtree(d, ignore_errors=True)
151
152     # If no exception was thrown and the response does not have an
153     # error_token field, presume success
154     if 'error_token' in result or 'uuid' not in result:
155         logger.error("API server returned an error result: {}".format(result))
156         exit(1)
157
158     logger.info("")
159     logger.info("Success: created copy with uuid {}".format(result['uuid']))
160     exit(0)
161
162 def set_src_owner_uuid(resource, uuid, args):
163     global src_owner_uuid
164     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
165     src_owner_uuid = c.get("owner_uuid")
166
167 # api_for_instance(instance_name)
168 #
169 #     Creates an API client for the Arvados instance identified by
170 #     instance_name.
171 #
172 #     If instance_name contains a slash, it is presumed to be a path
173 #     (either local or absolute) to a file with Arvados configuration
174 #     settings.
175 #
176 #     Otherwise, it is presumed to be the name of a file in
177 #     $HOME/.config/arvados/instance_name.conf
178 #
179 def api_for_instance(instance_name):
180     if not instance_name:
181         # Use environment
182         return arvados.api('v1', model=OrderedJsonModel())
183
184     if '/' in instance_name:
185         config_file = instance_name
186     else:
187         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
188
189     try:
190         cfg = arvados.config.load(config_file)
191     except (IOError, OSError) as e:
192         abort(("Could not open config file {}: {}\n" +
193                "You must make sure that your configuration tokens\n" +
194                "for Arvados instance {} are in {} and that this\n" +
195                "file is readable.").format(
196                    config_file, e, instance_name, config_file))
197
198     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
199         api_is_insecure = (
200             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
201                 ['1', 't', 'true', 'y', 'yes']))
202         client = arvados.api('v1',
203                              host=cfg['ARVADOS_API_HOST'],
204                              token=cfg['ARVADOS_API_TOKEN'],
205                              insecure=api_is_insecure,
206                              model=OrderedJsonModel())
207     else:
208         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
209     return client
210
211 # Check if git is available
212 def check_git_availability():
213     try:
214         arvados.util.run_command(['git', '--help'])
215     except Exception:
216         abort('git command is not available. Please ensure git is installed.')
217
218
219 def filter_iter(arg):
220     """Iterate a filter string-or-list.
221
222     Pass in a filter field that can either be a string or list.
223     This will iterate elements as if the field had been written as a list.
224     """
225     if isinstance(arg, basestring):
226         return iter((arg,))
227     else:
228         return iter(arg)
229
230 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
231     """Update a single repository filter in-place for the destination.
232
233     If the filter checks that the repository is src_repository, it is
234     updated to check that the repository is dst_repository.  If it does
235     anything else, this function raises ValueError.
236     """
237     if src_repository is None:
238         raise ValueError("component does not specify a source repository")
239     elif dst_repository is None:
240         raise ValueError("no destination repository specified to update repository filter")
241     elif repo_filter[1:] == ['=', src_repository]:
242         repo_filter[2] = dst_repository
243     elif repo_filter[1:] == ['in', [src_repository]]:
244         repo_filter[2] = [dst_repository]
245     else:
246         raise ValueError("repository filter is not a simple source match")
247
248 def migrate_script_version_filter(version_filter):
249     """Update a single script_version filter in-place for the destination.
250
251     Currently this function checks that all the filter operands are Git
252     commit hashes.  If they're not, it raises ValueError to indicate that
253     the filter is not portable.  It could be extended to make other
254     transformations in the future.
255     """
256     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
257         raise ValueError("script_version filter is not limited to commit hashes")
258
259 def attr_filtered(filter_, *attr_names):
260     """Return True if filter_ applies to any of attr_names, else False."""
261     return any((name == 'any') or (name in attr_names)
262                for name in filter_iter(filter_[0]))
263
264 @contextlib.contextmanager
265 def exception_handler(handler, *exc_types):
266     """If any exc_types are raised in the block, call handler on the exception."""
267     try:
268         yield
269     except exc_types as error:
270         handler(error)
271
272
273 # copy_workflow(wf_uuid, src, dst, args)
274 #
275 #    Copies a workflow identified by wf_uuid from src to dst.
276 #
277 #    If args.recursive is True, also copy any collections
278 #      referenced in the workflow definition yaml.
279 #
280 #    The owner_uuid of the new workflow is set to any given
281 #      project_uuid or the user who copied the template.
282 #
283 #    Returns the copied workflow object.
284 #
285 def copy_workflow(wf_uuid, src, dst, args):
286     # fetch the workflow from the source instance
287     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
288
289     # copy collections and docker images
290     if args.recursive:
291         wf_def = yaml.safe_load(wf["definition"])
292         if wf_def is not None:
293             locations = []
294             docker_images = {}
295             graph = wf_def.get('$graph', None)
296             if graph is not None:
297                 workflow_collections(graph, locations, docker_images)
298             else:
299                 workflow_collections(wf_def, locations, docker_images)
300
301             if locations:
302                 copy_collections(locations, src, dst, args)
303
304             for image in docker_images:
305                 copy_docker_image(image, docker_images[image], src, dst, args)
306
307     # copy the workflow itself
308     del wf['uuid']
309     wf['owner_uuid'] = args.project_uuid
310
311     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
312                                              ["name", "=", wf["name"]]]).execute()
313     if len(existing["items"]) == 0:
314         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
315     else:
316         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
317
318
319 def workflow_collections(obj, locations, docker_images):
320     if isinstance(obj, dict):
321         loc = obj.get('location', None)
322         if loc is not None:
323             if loc.startswith("keep:"):
324                 locations.append(loc[5:])
325
326         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
327         if docker_image is not None:
328             ds = docker_image.split(":", 1)
329             tag = ds[1] if len(ds)==2 else 'latest'
330             docker_images[ds[0]] = tag
331
332         for x in obj:
333             workflow_collections(obj[x], locations, docker_images)
334     elif isinstance(obj, list):
335         for x in obj:
336             workflow_collections(x, locations, docker_images)
337
338 # copy_collections(obj, src, dst, args)
339 #
340 #    Recursively copies all collections referenced by 'obj' from src
341 #    to dst.  obj may be a dict or a list, in which case we run
342 #    copy_collections on every value it contains. If it is a string,
343 #    search it for any substring that matches a collection hash or uuid
344 #    (this will find hidden references to collections like
345 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
346 #
347 #    Returns a copy of obj with any old collection uuids replaced by
348 #    the new ones.
349 #
350 def copy_collections(obj, src, dst, args):
351
352     def copy_collection_fn(collection_match):
353         """Helper function for regex substitution: copies a single collection,
354         identified by the collection_match MatchObject, to the
355         destination.  Returns the destination collection uuid (or the
356         portable data hash if that's what src_id is).
357
358         """
359         src_id = collection_match.group(0)
360         if src_id not in collections_copied:
361             dst_col = copy_collection(src_id, src, dst, args)
362             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
363                 collections_copied[src_id] = src_id
364             else:
365                 collections_copied[src_id] = dst_col['uuid']
366         return collections_copied[src_id]
367
368     if isinstance(obj, basestring):
369         # Copy any collections identified in this string to dst, replacing
370         # them with the dst uuids as necessary.
371         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
372         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
373         return obj
374     elif isinstance(obj, dict):
375         return type(obj)((v, copy_collections(obj[v], src, dst, args))
376                          for v in obj)
377     elif isinstance(obj, list):
378         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
379     return obj
380
381
382 def total_collection_size(manifest_text):
383     """Return the total number of bytes in this collection (excluding
384     duplicate blocks)."""
385
386     total_bytes = 0
387     locators_seen = {}
388     for line in manifest_text.splitlines():
389         words = line.split()
390         for word in words[1:]:
391             try:
392                 loc = arvados.KeepLocator(word)
393             except ValueError:
394                 continue  # this word isn't a locator, skip it
395             if loc.md5sum not in locators_seen:
396                 locators_seen[loc.md5sum] = True
397                 total_bytes += loc.size
398
399     return total_bytes
400
401 def create_collection_from(c, src, dst, args):
402     """Create a new collection record on dst, and copy Docker metadata if
403     available."""
404
405     collection_uuid = c['uuid']
406     body = {}
407     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
408         body[d] = c[d]
409
410     if not body["name"]:
411         body['name'] = "copied from " + collection_uuid
412
413     body['owner_uuid'] = args.project_uuid
414
415     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
416
417     # Create docker_image_repo+tag and docker_image_hash links
418     # at the destination.
419     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
420         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
421
422         for src_link in docker_links:
423             body = {key: src_link[key]
424                     for key in ['link_class', 'name', 'properties']}
425             body['head_uuid'] = dst_collection['uuid']
426             body['owner_uuid'] = args.project_uuid
427
428             lk = dst.links().create(body=body).execute(num_retries=args.retries)
429             logger.debug('created dst link {}'.format(lk))
430
431     return dst_collection
432
433 # copy_collection(obj_uuid, src, dst, args)
434 #
435 #    Copies the collection identified by obj_uuid from src to dst.
436 #    Returns the collection object created at dst.
437 #
438 #    If args.progress is True, produce a human-friendly progress
439 #    report.
440 #
441 #    If a collection with the desired portable_data_hash already
442 #    exists at dst, and args.force is False, copy_collection returns
443 #    the existing collection without copying any blocks.  Otherwise
444 #    (if no collection exists or if args.force is True)
445 #    copy_collection copies all of the collection data blocks from src
446 #    to dst.
447 #
448 #    For this application, it is critical to preserve the
449 #    collection's manifest hash, which is not guaranteed with the
450 #    arvados.CollectionReader and arvados.CollectionWriter classes.
451 #    Copying each block in the collection manually, followed by
452 #    the manifest block, ensures that the collection's manifest
453 #    hash will not change.
454 #
455 def copy_collection(obj_uuid, src, dst, args):
456     if arvados.util.keep_locator_pattern.match(obj_uuid):
457         # If the obj_uuid is a portable data hash, it might not be
458         # uniquely identified with a particular collection.  As a
459         # result, it is ambiguous as to what name to use for the copy.
460         # Apply some heuristics to pick which collection to get the
461         # name from.
462         srccol = src.collections().list(
463             filters=[['portable_data_hash', '=', obj_uuid]],
464             order="created_at asc"
465             ).execute(num_retries=args.retries)
466
467         items = srccol.get("items")
468
469         if not items:
470             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
471             return
472
473         c = None
474
475         if len(items) == 1:
476             # There's only one collection with the PDH, so use that.
477             c = items[0]
478         if not c:
479             # See if there is a collection that's in the same project
480             # as the root item (usually a workflow) being copied.
481             for i in items:
482                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
483                     c = i
484                     break
485         if not c:
486             # Didn't find any collections located in the same project, so
487             # pick the oldest collection that has a name assigned to it.
488             for i in items:
489                 if i.get("name"):
490                     c = i
491                     break
492         if not c:
493             # None of the collections have names (?!), so just pick the
494             # first one.
495             c = items[0]
496
497         # list() doesn't return manifest text (and we don't want it to,
498         # because we don't need the same maninfest text sent to us 50
499         # times) so go and retrieve the collection object directly
500         # which will include the manifest text.
501         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
502     else:
503         # Assume this is an actual collection uuid, so fetch it directly.
504         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
505
506     # If a collection with this hash already exists at the
507     # destination, and 'force' is not true, just return that
508     # collection.
509     if not args.force:
510         if 'portable_data_hash' in c:
511             colhash = c['portable_data_hash']
512         else:
513             colhash = c['uuid']
514         dstcol = dst.collections().list(
515             filters=[['portable_data_hash', '=', colhash]]
516         ).execute(num_retries=args.retries)
517         if dstcol['items_available'] > 0:
518             for d in dstcol['items']:
519                 if ((args.project_uuid == d['owner_uuid']) and
520                     (c.get('name') == d['name']) and
521                     (c['portable_data_hash'] == d['portable_data_hash'])):
522                     return d
523             c['manifest_text'] = dst.collections().get(
524                 uuid=dstcol['items'][0]['uuid']
525             ).execute(num_retries=args.retries)['manifest_text']
526             return create_collection_from(c, src, dst, args)
527
528     # Fetch the collection's manifest.
529     manifest = c['manifest_text']
530     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
531
532     # Copy each block from src_keep to dst_keep.
533     # Use the newly signed locators returned from dst_keep to build
534     # a new manifest as we go.
535     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
536     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
537     dst_manifest = io.StringIO()
538     dst_locators = {}
539     bytes_written = 0
540     bytes_expected = total_collection_size(manifest)
541     if args.progress:
542         progress_writer = ProgressWriter(human_progress)
543     else:
544         progress_writer = None
545
546     for line in manifest.splitlines():
547         words = line.split()
548         dst_manifest.write(words[0])
549         for word in words[1:]:
550             try:
551                 loc = arvados.KeepLocator(word)
552             except ValueError:
553                 # If 'word' can't be parsed as a locator,
554                 # presume it's a filename.
555                 dst_manifest.write(' ')
556                 dst_manifest.write(word)
557                 continue
558             blockhash = loc.md5sum
559             # copy this block if we haven't seen it before
560             # (otherwise, just reuse the existing dst_locator)
561             if blockhash not in dst_locators:
562                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
563                 if progress_writer:
564                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
565                 data = src_keep.get(word)
566                 dst_locator = dst_keep.put(data)
567                 dst_locators[blockhash] = dst_locator
568                 bytes_written += loc.size
569             dst_manifest.write(' ')
570             dst_manifest.write(dst_locators[blockhash])
571         dst_manifest.write("\n")
572
573     if progress_writer:
574         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
575         progress_writer.finish()
576
577     # Copy the manifest and save the collection.
578     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
579
580     c['manifest_text'] = dst_manifest.getvalue()
581     return create_collection_from(c, src, dst, args)
582
583 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
584     r = api.repositories().list(
585         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
586     if r['items_available'] != 1:
587         raise Exception('cannot identify repo {}; {} repos found'
588                         .format(repo_name, r['items_available']))
589
590     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
591     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
592     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
593
594     priority = https_url + other_url + http_url
595
596     git_config = []
597     git_url = None
598     for url in priority:
599         if url.startswith("http"):
600             u = urllib.parse.urlsplit(url)
601             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
602             git_config = ["-c", "credential.%s/.username=none" % baseurl,
603                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
604         else:
605             git_config = []
606
607         try:
608             logger.debug("trying %s", url)
609             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
610                                       env={"HOME": os.environ["HOME"],
611                                            "ARVADOS_API_TOKEN": api.api_token,
612                                            "GIT_ASKPASS": "/bin/false"})
613         except arvados.errors.CommandFailedError:
614             pass
615         else:
616             git_url = url
617             break
618
619     if not git_url:
620         raise Exception('Cannot access git repository, tried {}'
621                         .format(priority))
622
623     if git_url.startswith("http:"):
624         if allow_insecure_http:
625             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
626         else:
627             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
628
629     return (git_url, git_config)
630
631
632 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
633     """Copy the docker image identified by docker_image and
634     docker_image_tag from src to dst. Create appropriate
635     docker_image_repo+tag and docker_image_hash links at dst.
636
637     """
638
639     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
640
641     # Find the link identifying this docker image.
642     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
643         src, args.retries, docker_image, docker_image_tag)
644     if docker_image_list:
645         image_uuid, image_info = docker_image_list[0]
646         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
647
648         # Copy the collection it refers to.
649         dst_image_col = copy_collection(image_uuid, src, dst, args)
650     elif arvados.util.keep_locator_pattern.match(docker_image):
651         dst_image_col = copy_collection(docker_image, src, dst, args)
652     else:
653         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
654
655 def copy_project(obj_uuid, src, dst, owner_uuid, args):
656
657     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
658
659     # Create/update the destination project
660     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
661                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
662     if len(existing["items"]) == 0:
663         project_record = dst.groups().create(body={"group": {"group_class": "project",
664                                                              "owner_uuid": owner_uuid,
665                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
666     else:
667         project_record = existing["items"][0]
668
669     dst.groups().update(uuid=project_record["uuid"],
670                         body={"group": {
671                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
672
673     args.project_uuid = project_record["uuid"]
674
675     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
676
677     # Copy collections
678     copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
679                      src, dst, args)
680
681     # Copy workflows
682     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
683         copy_workflow(w["uuid"], src, dst, args)
684
685     if args.recursive:
686         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
687             copy_project(g["uuid"], src, dst, project_record["uuid"], args)
688
689     return project_record
690
691 # git_rev_parse(rev, repo)
692 #
693 #    Returns the 40-character commit hash corresponding to 'rev' in
694 #    git repository 'repo' (which must be the path of a local git
695 #    repository)
696 #
697 def git_rev_parse(rev, repo):
698     gitout, giterr = arvados.util.run_command(
699         ['git', 'rev-parse', rev], cwd=repo)
700     return gitout.strip()
701
702 # uuid_type(api, object_uuid)
703 #
704 #    Returns the name of the class that object_uuid belongs to, based on
705 #    the second field of the uuid.  This function consults the api's
706 #    schema to identify the object class.
707 #
708 #    It returns a string such as 'Collection', 'Workflow', etc.
709 #
710 #    Special case: if handed a Keep locator hash, return 'Collection'.
711 #
712 def uuid_type(api, object_uuid):
713     if re.match(arvados.util.keep_locator_pattern, object_uuid):
714         return 'Collection'
715     p = object_uuid.split('-')
716     if len(p) == 3:
717         type_prefix = p[1]
718         for k in api._schema.schemas:
719             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
720             if type_prefix == obj_class:
721                 return k
722     return None
723
724 def abort(msg, code=1):
725     logger.info("arv-copy: %s", msg)
726     exit(code)
727
728
729 # Code for reporting on the progress of a collection upload.
730 # Stolen from arvados.commands.put.ArvPutCollectionWriter
731 # TODO(twp): figure out how to refactor into a shared library
732 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
733 # code)
734
735 def machine_progress(obj_uuid, bytes_written, bytes_expected):
736     return "{} {}: {} {} written {} total\n".format(
737         sys.argv[0],
738         os.getpid(),
739         obj_uuid,
740         bytes_written,
741         -1 if (bytes_expected is None) else bytes_expected)
742
743 def human_progress(obj_uuid, bytes_written, bytes_expected):
744     if bytes_expected:
745         return "\r{}: {}M / {}M {:.1%} ".format(
746             obj_uuid,
747             bytes_written >> 20, bytes_expected >> 20,
748             float(bytes_written) / bytes_expected)
749     else:
750         return "\r{}: {} ".format(obj_uuid, bytes_written)
751
752 class ProgressWriter(object):
753     _progress_func = None
754     outfile = sys.stderr
755
756     def __init__(self, progress_func):
757         self._progress_func = progress_func
758
759     def report(self, obj_uuid, bytes_written, bytes_expected):
760         if self._progress_func is not None:
761             self.outfile.write(
762                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
763
764     def finish(self):
765         self.outfile.write("\n")
766
767 if __name__ == '__main__':
768     main()