aee71195dd79891da9c3958d91706f2ac2f7202a
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53 # local_repo_dir records which git repositories from the Arvados source
54 # instance have been checked out locally during this run, and to which
55 # directories.
56 # e.g. if repository 'twp' from src_arv has been cloned into
57 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
58 #
59 local_repo_dir = {}
60
61 # List of collections that have been copied in this session, and their
62 # destination collection UUIDs.
63 collections_copied = {}
64
65 # Set of (repository, script_version) two-tuples of commits copied in git.
66 scripts_copied = set()
67
68 # The owner_uuid of the object being copied
69 src_owner_uuid = None
70
71 def main():
72     copy_opts = argparse.ArgumentParser(add_help=False)
73
74     copy_opts.add_argument(
75         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76         help='Print version and exit.')
77     copy_opts.add_argument(
78         '-v', '--verbose', dest='verbose', action='store_true',
79         help='Verbose output.')
80     copy_opts.add_argument(
81         '--progress', dest='progress', action='store_true',
82         help='Report progress on copying collections. (default)')
83     copy_opts.add_argument(
84         '--no-progress', dest='progress', action='store_false',
85         help='Do not report progress on copying collections.')
86     copy_opts.add_argument(
87         '-f', '--force', dest='force', action='store_true',
88         help='Perform copy even if the object appears to exist at the remote destination.')
89     copy_opts.add_argument(
90         '--src', dest='source_arvados',
91         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
92     copy_opts.add_argument(
93         '--dst', dest='destination_arvados',
94         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95     copy_opts.add_argument(
96         '--recursive', dest='recursive', action='store_true',
97         help='Recursively copy any dependencies for this object, and subprojects. (default)')
98     copy_opts.add_argument(
99         '--no-recursive', dest='recursive', action='store_false',
100         help='Do not copy any dependencies or subprojects.')
101     copy_opts.add_argument(
102         '--project-uuid', dest='project_uuid',
103         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
104
105     copy_opts.add_argument(
106         'object_uuid',
107         help='The UUID of the object to be copied.')
108     copy_opts.set_defaults(progress=True)
109     copy_opts.set_defaults(recursive=True)
110
111     parser = argparse.ArgumentParser(
112         description='Copy a workflow or collection from one Arvados instance to another.',
113         parents=[copy_opts, arv_cmd.retry_opt])
114     args = parser.parse_args()
115
116     if args.verbose:
117         logger.setLevel(logging.DEBUG)
118     else:
119         logger.setLevel(logging.INFO)
120
121     if not args.source_arvados:
122         args.source_arvados = args.object_uuid[:5]
123
124     # Create API clients for the source and destination instances
125     src_arv = api_for_instance(args.source_arvados)
126     dst_arv = api_for_instance(args.destination_arvados)
127
128     if not args.project_uuid:
129         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
130
131     # Identify the kind of object we have been given, and begin copying.
132     t = uuid_type(src_arv, args.object_uuid)
133     if t == 'Collection':
134         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
135         result = copy_collection(args.object_uuid,
136                                  src_arv, dst_arv,
137                                  args)
138     elif t == 'Workflow':
139         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
140         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
141     elif t == 'Group':
142         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
143         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
144     else:
145         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
146
147     # Clean up any outstanding temp git repositories.
148     for d in listvalues(local_repo_dir):
149         shutil.rmtree(d, ignore_errors=True)
150
151     # If no exception was thrown and the response does not have an
152     # error_token field, presume success
153     if 'error_token' in result or 'uuid' not in result:
154         logger.error("API server returned an error result: {}".format(result))
155         exit(1)
156
157     logger.info("")
158     logger.info("Success: created copy with uuid {}".format(result['uuid']))
159     exit(0)
160
161 def set_src_owner_uuid(resource, uuid, args):
162     global src_owner_uuid
163     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
164     src_owner_uuid = c.get("owner_uuid")
165
166 # api_for_instance(instance_name)
167 #
168 #     Creates an API client for the Arvados instance identified by
169 #     instance_name.
170 #
171 #     If instance_name contains a slash, it is presumed to be a path
172 #     (either local or absolute) to a file with Arvados configuration
173 #     settings.
174 #
175 #     Otherwise, it is presumed to be the name of a file in
176 #     $HOME/.config/arvados/instance_name.conf
177 #
178 def api_for_instance(instance_name):
179     if not instance_name:
180         # Use environment
181         return arvados.api('v1', model=OrderedJsonModel())
182
183     if '/' in instance_name:
184         config_file = instance_name
185     else:
186         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
187
188     try:
189         cfg = arvados.config.load(config_file)
190     except (IOError, OSError) as e:
191         abort(("Could not open config file {}: {}\n" +
192                "You must make sure that your configuration tokens\n" +
193                "for Arvados instance {} are in {} and that this\n" +
194                "file is readable.").format(
195                    config_file, e, instance_name, config_file))
196
197     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
198         api_is_insecure = (
199             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
200                 ['1', 't', 'true', 'y', 'yes']))
201         client = arvados.api('v1',
202                              host=cfg['ARVADOS_API_HOST'],
203                              token=cfg['ARVADOS_API_TOKEN'],
204                              insecure=api_is_insecure,
205                              model=OrderedJsonModel())
206     else:
207         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
208     return client
209
210 # Check if git is available
211 def check_git_availability():
212     try:
213         arvados.util.run_command(['git', '--help'])
214     except Exception:
215         abort('git command is not available. Please ensure git is installed.')
216
217
218 def filter_iter(arg):
219     """Iterate a filter string-or-list.
220
221     Pass in a filter field that can either be a string or list.
222     This will iterate elements as if the field had been written as a list.
223     """
224     if isinstance(arg, basestring):
225         return iter((arg,))
226     else:
227         return iter(arg)
228
229 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
230     """Update a single repository filter in-place for the destination.
231
232     If the filter checks that the repository is src_repository, it is
233     updated to check that the repository is dst_repository.  If it does
234     anything else, this function raises ValueError.
235     """
236     if src_repository is None:
237         raise ValueError("component does not specify a source repository")
238     elif dst_repository is None:
239         raise ValueError("no destination repository specified to update repository filter")
240     elif repo_filter[1:] == ['=', src_repository]:
241         repo_filter[2] = dst_repository
242     elif repo_filter[1:] == ['in', [src_repository]]:
243         repo_filter[2] = [dst_repository]
244     else:
245         raise ValueError("repository filter is not a simple source match")
246
247 def migrate_script_version_filter(version_filter):
248     """Update a single script_version filter in-place for the destination.
249
250     Currently this function checks that all the filter operands are Git
251     commit hashes.  If they're not, it raises ValueError to indicate that
252     the filter is not portable.  It could be extended to make other
253     transformations in the future.
254     """
255     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
256         raise ValueError("script_version filter is not limited to commit hashes")
257
258 def attr_filtered(filter_, *attr_names):
259     """Return True if filter_ applies to any of attr_names, else False."""
260     return any((name == 'any') or (name in attr_names)
261                for name in filter_iter(filter_[0]))
262
263 @contextlib.contextmanager
264 def exception_handler(handler, *exc_types):
265     """If any exc_types are raised in the block, call handler on the exception."""
266     try:
267         yield
268     except exc_types as error:
269         handler(error)
270
271
272 # copy_workflow(wf_uuid, src, dst, args)
273 #
274 #    Copies a workflow identified by wf_uuid from src to dst.
275 #
276 #    If args.recursive is True, also copy any collections
277 #      referenced in the workflow definition yaml.
278 #
279 #    The owner_uuid of the new workflow is set to any given
280 #      project_uuid or the user who copied the template.
281 #
282 #    Returns the copied workflow object.
283 #
284 def copy_workflow(wf_uuid, src, dst, args):
285     # fetch the workflow from the source instance
286     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
287
288     # copy collections and docker images
289     if args.recursive:
290         wf_def = yaml.safe_load(wf["definition"])
291         if wf_def is not None:
292             locations = []
293             docker_images = {}
294             graph = wf_def.get('$graph', None)
295             if graph is not None:
296                 workflow_collections(graph, locations, docker_images)
297             else:
298                 workflow_collections(wf_def, locations, docker_images)
299
300             if locations:
301                 copy_collections(locations, src, dst, args)
302
303             for image in docker_images:
304                 copy_docker_image(image, docker_images[image], src, dst, args)
305
306     # copy the workflow itself
307     del wf['uuid']
308     wf['owner_uuid'] = args.project_uuid
309
310     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
311                                              ["name", "=", wf["name"]]]).execute()
312     if len(existing["items"]) == 0:
313         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
314     else:
315         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
316
317
318 def workflow_collections(obj, locations, docker_images):
319     if isinstance(obj, dict):
320         loc = obj.get('location', None)
321         if loc is not None:
322             if loc.startswith("keep:"):
323                 locations.append(loc[5:])
324
325         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
326         if docker_image is not None:
327             ds = docker_image.split(":", 1)
328             tag = ds[1] if len(ds)==2 else 'latest'
329             docker_images[ds[0]] = tag
330
331         for x in obj:
332             workflow_collections(obj[x], locations, docker_images)
333     elif isinstance(obj, list):
334         for x in obj:
335             workflow_collections(x, locations, docker_images)
336
337 # copy_collections(obj, src, dst, args)
338 #
339 #    Recursively copies all collections referenced by 'obj' from src
340 #    to dst.  obj may be a dict or a list, in which case we run
341 #    copy_collections on every value it contains. If it is a string,
342 #    search it for any substring that matches a collection hash or uuid
343 #    (this will find hidden references to collections like
344 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
345 #
346 #    Returns a copy of obj with any old collection uuids replaced by
347 #    the new ones.
348 #
349 def copy_collections(obj, src, dst, args):
350
351     def copy_collection_fn(collection_match):
352         """Helper function for regex substitution: copies a single collection,
353         identified by the collection_match MatchObject, to the
354         destination.  Returns the destination collection uuid (or the
355         portable data hash if that's what src_id is).
356
357         """
358         src_id = collection_match.group(0)
359         if src_id not in collections_copied:
360             dst_col = copy_collection(src_id, src, dst, args)
361             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
362                 collections_copied[src_id] = src_id
363             else:
364                 collections_copied[src_id] = dst_col['uuid']
365         return collections_copied[src_id]
366
367     if isinstance(obj, basestring):
368         # Copy any collections identified in this string to dst, replacing
369         # them with the dst uuids as necessary.
370         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
371         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
372         return obj
373     elif isinstance(obj, dict):
374         return type(obj)((v, copy_collections(obj[v], src, dst, args))
375                          for v in obj)
376     elif isinstance(obj, list):
377         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
378     return obj
379
380
381 def total_collection_size(manifest_text):
382     """Return the total number of bytes in this collection (excluding
383     duplicate blocks)."""
384
385     total_bytes = 0
386     locators_seen = {}
387     for line in manifest_text.splitlines():
388         words = line.split()
389         for word in words[1:]:
390             try:
391                 loc = arvados.KeepLocator(word)
392             except ValueError:
393                 continue  # this word isn't a locator, skip it
394             if loc.md5sum not in locators_seen:
395                 locators_seen[loc.md5sum] = True
396                 total_bytes += loc.size
397
398     return total_bytes
399
400 def create_collection_from(c, src, dst, args):
401     """Create a new collection record on dst, and copy Docker metadata if
402     available."""
403
404     collection_uuid = c['uuid']
405     body = {}
406     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
407         body[d] = c[d]
408
409     if not body["name"]:
410         body['name'] = "copied from " + collection_uuid
411
412     body['owner_uuid'] = args.project_uuid
413
414     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
415
416     # Create docker_image_repo+tag and docker_image_hash links
417     # at the destination.
418     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
419         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
420
421         for src_link in docker_links:
422             body = {key: src_link[key]
423                     for key in ['link_class', 'name', 'properties']}
424             body['head_uuid'] = dst_collection['uuid']
425             body['owner_uuid'] = args.project_uuid
426
427             lk = dst.links().create(body=body).execute(num_retries=args.retries)
428             logger.debug('created dst link {}'.format(lk))
429
430     return dst_collection
431
432 # copy_collection(obj_uuid, src, dst, args)
433 #
434 #    Copies the collection identified by obj_uuid from src to dst.
435 #    Returns the collection object created at dst.
436 #
437 #    If args.progress is True, produce a human-friendly progress
438 #    report.
439 #
440 #    If a collection with the desired portable_data_hash already
441 #    exists at dst, and args.force is False, copy_collection returns
442 #    the existing collection without copying any blocks.  Otherwise
443 #    (if no collection exists or if args.force is True)
444 #    copy_collection copies all of the collection data blocks from src
445 #    to dst.
446 #
447 #    For this application, it is critical to preserve the
448 #    collection's manifest hash, which is not guaranteed with the
449 #    arvados.CollectionReader and arvados.CollectionWriter classes.
450 #    Copying each block in the collection manually, followed by
451 #    the manifest block, ensures that the collection's manifest
452 #    hash will not change.
453 #
454 def copy_collection(obj_uuid, src, dst, args):
455     if arvados.util.keep_locator_pattern.match(obj_uuid):
456         # If the obj_uuid is a portable data hash, it might not be
457         # uniquely identified with a particular collection.  As a
458         # result, it is ambiguous as to what name to use for the copy.
459         # Apply some heuristics to pick which collection to get the
460         # name from.
461         srccol = src.collections().list(
462             filters=[['portable_data_hash', '=', obj_uuid]],
463             order="created_at asc"
464             ).execute(num_retries=args.retries)
465
466         items = srccol.get("items")
467
468         if not items:
469             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
470             return
471
472         c = None
473
474         if len(items) == 1:
475             # There's only one collection with the PDH, so use that.
476             c = items[0]
477         if not c:
478             # See if there is a collection that's in the same project
479             # as the root item (usually a workflow) being copied.
480             for i in items:
481                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
482                     c = i
483                     break
484         if not c:
485             # Didn't find any collections located in the same project, so
486             # pick the oldest collection that has a name assigned to it.
487             for i in items:
488                 if i.get("name"):
489                     c = i
490                     break
491         if not c:
492             # None of the collections have names (?!), so just pick the
493             # first one.
494             c = items[0]
495
496         # list() doesn't return manifest text (and we don't want it to,
497         # because we don't need the same maninfest text sent to us 50
498         # times) so go and retrieve the collection object directly
499         # which will include the manifest text.
500         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
501     else:
502         # Assume this is an actual collection uuid, so fetch it directly.
503         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
504
505     # If a collection with this hash already exists at the
506     # destination, and 'force' is not true, just return that
507     # collection.
508     if not args.force:
509         if 'portable_data_hash' in c:
510             colhash = c['portable_data_hash']
511         else:
512             colhash = c['uuid']
513         dstcol = dst.collections().list(
514             filters=[['portable_data_hash', '=', colhash]]
515         ).execute(num_retries=args.retries)
516         if dstcol['items_available'] > 0:
517             for d in dstcol['items']:
518                 if ((args.project_uuid == d['owner_uuid']) and
519                     (c.get('name') == d['name']) and
520                     (c['portable_data_hash'] == d['portable_data_hash'])):
521                     return d
522             c['manifest_text'] = dst.collections().get(
523                 uuid=dstcol['items'][0]['uuid']
524             ).execute(num_retries=args.retries)['manifest_text']
525             return create_collection_from(c, src, dst, args)
526
527     # Fetch the collection's manifest.
528     manifest = c['manifest_text']
529     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
530
531     # Copy each block from src_keep to dst_keep.
532     # Use the newly signed locators returned from dst_keep to build
533     # a new manifest as we go.
534     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
535     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
536     dst_manifest = ""
537     dst_locators = {}
538     bytes_written = 0
539     bytes_expected = total_collection_size(manifest)
540     if args.progress:
541         progress_writer = ProgressWriter(human_progress)
542     else:
543         progress_writer = None
544
545     for line in manifest.splitlines():
546         words = line.split()
547         dst_manifest += words[0]
548         for word in words[1:]:
549             try:
550                 loc = arvados.KeepLocator(word)
551             except ValueError:
552                 # If 'word' can't be parsed as a locator,
553                 # presume it's a filename.
554                 dst_manifest += ' ' + word
555                 continue
556             blockhash = loc.md5sum
557             # copy this block if we haven't seen it before
558             # (otherwise, just reuse the existing dst_locator)
559             if blockhash not in dst_locators:
560                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
561                 if progress_writer:
562                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
563                 data = src_keep.get(word)
564                 dst_locator = dst_keep.put(data)
565                 dst_locators[blockhash] = dst_locator
566                 bytes_written += loc.size
567             dst_manifest += ' ' + dst_locators[blockhash]
568         dst_manifest += "\n"
569
570     if progress_writer:
571         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
572         progress_writer.finish()
573
574     # Copy the manifest and save the collection.
575     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
576
577     c['manifest_text'] = dst_manifest
578     return create_collection_from(c, src, dst, args)
579
580 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
581     r = api.repositories().list(
582         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
583     if r['items_available'] != 1:
584         raise Exception('cannot identify repo {}; {} repos found'
585                         .format(repo_name, r['items_available']))
586
587     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
588     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
589     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
590
591     priority = https_url + other_url + http_url
592
593     git_config = []
594     git_url = None
595     for url in priority:
596         if url.startswith("http"):
597             u = urllib.parse.urlsplit(url)
598             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
599             git_config = ["-c", "credential.%s/.username=none" % baseurl,
600                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
601         else:
602             git_config = []
603
604         try:
605             logger.debug("trying %s", url)
606             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
607                                       env={"HOME": os.environ["HOME"],
608                                            "ARVADOS_API_TOKEN": api.api_token,
609                                            "GIT_ASKPASS": "/bin/false"})
610         except arvados.errors.CommandFailedError:
611             pass
612         else:
613             git_url = url
614             break
615
616     if not git_url:
617         raise Exception('Cannot access git repository, tried {}'
618                         .format(priority))
619
620     if git_url.startswith("http:"):
621         if allow_insecure_http:
622             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
623         else:
624             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
625
626     return (git_url, git_config)
627
628
629 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
630     """Copy the docker image identified by docker_image and
631     docker_image_tag from src to dst. Create appropriate
632     docker_image_repo+tag and docker_image_hash links at dst.
633
634     """
635
636     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
637
638     # Find the link identifying this docker image.
639     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
640         src, args.retries, docker_image, docker_image_tag)
641     if docker_image_list:
642         image_uuid, image_info = docker_image_list[0]
643         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
644
645         # Copy the collection it refers to.
646         dst_image_col = copy_collection(image_uuid, src, dst, args)
647     elif arvados.util.keep_locator_pattern.match(docker_image):
648         dst_image_col = copy_collection(docker_image, src, dst, args)
649     else:
650         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
651
652 def copy_project(obj_uuid, src, dst, owner_uuid, args):
653
654     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
655
656     # Create/update the destination project
657     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
658                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
659     if len(existing["items"]) == 0:
660         project_record = dst.groups().create(body={"group": {"group_class": "project",
661                                                              "owner_uuid": owner_uuid,
662                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
663     else:
664         project_record = existing["items"][0]
665
666     dst.groups().update(uuid=project_record["uuid"],
667                         body={"group": {
668                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
669
670     args.project_uuid = project_record["uuid"]
671
672     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
673
674     # Copy collections
675     copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
676                      src, dst, args)
677
678     # Copy workflows
679     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
680         copy_workflow(w["uuid"], src, dst, args)
681
682     if args.recursive:
683         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
684             copy_project(g["uuid"], src, dst, project_record["uuid"], args)
685
686     return project_record
687
688 # git_rev_parse(rev, repo)
689 #
690 #    Returns the 40-character commit hash corresponding to 'rev' in
691 #    git repository 'repo' (which must be the path of a local git
692 #    repository)
693 #
694 def git_rev_parse(rev, repo):
695     gitout, giterr = arvados.util.run_command(
696         ['git', 'rev-parse', rev], cwd=repo)
697     return gitout.strip()
698
699 # uuid_type(api, object_uuid)
700 #
701 #    Returns the name of the class that object_uuid belongs to, based on
702 #    the second field of the uuid.  This function consults the api's
703 #    schema to identify the object class.
704 #
705 #    It returns a string such as 'Collection', 'Workflow', etc.
706 #
707 #    Special case: if handed a Keep locator hash, return 'Collection'.
708 #
709 def uuid_type(api, object_uuid):
710     if re.match(arvados.util.keep_locator_pattern, object_uuid):
711         return 'Collection'
712     p = object_uuid.split('-')
713     if len(p) == 3:
714         type_prefix = p[1]
715         for k in api._schema.schemas:
716             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
717             if type_prefix == obj_class:
718                 return k
719     return None
720
721 def abort(msg, code=1):
722     logger.info("arv-copy: %s", msg)
723     exit(code)
724
725
726 # Code for reporting on the progress of a collection upload.
727 # Stolen from arvados.commands.put.ArvPutCollectionWriter
728 # TODO(twp): figure out how to refactor into a shared library
729 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
730 # code)
731
732 def machine_progress(obj_uuid, bytes_written, bytes_expected):
733     return "{} {}: {} {} written {} total\n".format(
734         sys.argv[0],
735         os.getpid(),
736         obj_uuid,
737         bytes_written,
738         -1 if (bytes_expected is None) else bytes_expected)
739
740 def human_progress(obj_uuid, bytes_written, bytes_expected):
741     if bytes_expected:
742         return "\r{}: {}M / {}M {:.1%} ".format(
743             obj_uuid,
744             bytes_written >> 20, bytes_expected >> 20,
745             float(bytes_written) / bytes_expected)
746     else:
747         return "\r{}: {} ".format(obj_uuid, bytes_written)
748
749 class ProgressWriter(object):
750     _progress_func = None
751     outfile = sys.stderr
752
753     def __init__(self, progress_func):
754         self._progress_func = progress_func
755
756     def report(self, obj_uuid, bytes_written, bytes_expected):
757         if self._progress_func is not None:
758             self.outfile.write(
759                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
760
761     def finish(self):
762         self.outfile.write("\n")
763
764 if __name__ == '__main__':
765     main()