17388: Adds storage classes support to arv-copy.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # arv-copy [--recursive] [--no-recursive] object-uuid
6 #
7 # Copies an object from Arvados instance src to instance dst.
8 #
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
15 #
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst.  If either of these files is not found,
19 # arv-copy will issue an error.
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37 import io
38
39 import arvados
40 import arvados.config
41 import arvados.keep
42 import arvados.util
43 import arvados.commands._util as arv_cmd
44 import arvados.commands.keepdocker
45 import ruamel.yaml as yaml
46
47 from arvados.api import OrderedJsonModel
48 from arvados._version import __version__
49
50 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
51
52 logger = logging.getLogger('arvados.arv-copy')
53
54 # local_repo_dir records which git repositories from the Arvados source
55 # instance have been checked out locally during this run, and to which
56 # directories.
57 # e.g. if repository 'twp' from src_arv has been cloned into
58 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 #
60 local_repo_dir = {}
61
62 # List of collections that have been copied in this session, and their
63 # destination collection UUIDs.
64 collections_copied = {}
65
66 # Set of (repository, script_version) two-tuples of commits copied in git.
67 scripts_copied = set()
68
69 # The owner_uuid of the object being copied
70 src_owner_uuid = None
71
72 def main():
73     copy_opts = argparse.ArgumentParser(add_help=False)
74
75     copy_opts.add_argument(
76         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
77         help='Print version and exit.')
78     copy_opts.add_argument(
79         '-v', '--verbose', dest='verbose', action='store_true',
80         help='Verbose output.')
81     copy_opts.add_argument(
82         '--progress', dest='progress', action='store_true',
83         help='Report progress on copying collections. (default)')
84     copy_opts.add_argument(
85         '--no-progress', dest='progress', action='store_false',
86         help='Do not report progress on copying collections.')
87     copy_opts.add_argument(
88         '-f', '--force', dest='force', action='store_true',
89         help='Perform copy even if the object appears to exist at the remote destination.')
90     copy_opts.add_argument(
91         '--src', dest='source_arvados',
92         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
93     copy_opts.add_argument(
94         '--dst', dest='destination_arvados',
95         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
96     copy_opts.add_argument(
97         '--recursive', dest='recursive', action='store_true',
98         help='Recursively copy any dependencies for this object, and subprojects. (default)')
99     copy_opts.add_argument(
100         '--no-recursive', dest='recursive', action='store_false',
101         help='Do not copy any dependencies or subprojects.')
102     copy_opts.add_argument(
103         '--project-uuid', dest='project_uuid',
104         help='The UUID of the project at the destination to which the collection or workflow should be copied.')
105     copy_opts.add_argument(
106         '--storage-classes', dest='storage_classes',
107         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
108
109     copy_opts.add_argument(
110         'object_uuid',
111         help='The UUID of the object to be copied.')
112     copy_opts.set_defaults(progress=True)
113     copy_opts.set_defaults(recursive=True)
114
115     parser = argparse.ArgumentParser(
116         description='Copy a workflow or collection from one Arvados instance to another.',
117         parents=[copy_opts, arv_cmd.retry_opt])
118     args = parser.parse_args()
119
120     if args.storage_classes:
121         args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
122
123     if args.verbose:
124         logger.setLevel(logging.DEBUG)
125     else:
126         logger.setLevel(logging.INFO)
127
128     if not args.source_arvados:
129         args.source_arvados = args.object_uuid[:5]
130
131     # Create API clients for the source and destination instances
132     src_arv = api_for_instance(args.source_arvados)
133     dst_arv = api_for_instance(args.destination_arvados)
134
135     if not args.project_uuid:
136         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
137
138     # Identify the kind of object we have been given, and begin copying.
139     t = uuid_type(src_arv, args.object_uuid)
140     if t == 'Collection':
141         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
142         result = copy_collection(args.object_uuid,
143                                  src_arv, dst_arv,
144                                  args)
145     elif t == 'Workflow':
146         set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
147         result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
148     elif t == 'Group':
149         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
150         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
151     else:
152         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
153
154     # Clean up any outstanding temp git repositories.
155     for d in listvalues(local_repo_dir):
156         shutil.rmtree(d, ignore_errors=True)
157
158     # If no exception was thrown and the response does not have an
159     # error_token field, presume success
160     if 'error_token' in result or 'uuid' not in result:
161         logger.error("API server returned an error result: {}".format(result))
162         exit(1)
163
164     logger.info("")
165     logger.info("Success: created copy with uuid {}".format(result['uuid']))
166     exit(0)
167
168 def set_src_owner_uuid(resource, uuid, args):
169     global src_owner_uuid
170     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
171     src_owner_uuid = c.get("owner_uuid")
172
173 # api_for_instance(instance_name)
174 #
175 #     Creates an API client for the Arvados instance identified by
176 #     instance_name.
177 #
178 #     If instance_name contains a slash, it is presumed to be a path
179 #     (either local or absolute) to a file with Arvados configuration
180 #     settings.
181 #
182 #     Otherwise, it is presumed to be the name of a file in
183 #     $HOME/.config/arvados/instance_name.conf
184 #
185 def api_for_instance(instance_name):
186     if not instance_name:
187         # Use environment
188         return arvados.api('v1', model=OrderedJsonModel())
189
190     if '/' in instance_name:
191         config_file = instance_name
192     else:
193         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
194
195     try:
196         cfg = arvados.config.load(config_file)
197     except (IOError, OSError) as e:
198         abort(("Could not open config file {}: {}\n" +
199                "You must make sure that your configuration tokens\n" +
200                "for Arvados instance {} are in {} and that this\n" +
201                "file is readable.").format(
202                    config_file, e, instance_name, config_file))
203
204     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
205         api_is_insecure = (
206             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
207                 ['1', 't', 'true', 'y', 'yes']))
208         client = arvados.api('v1',
209                              host=cfg['ARVADOS_API_HOST'],
210                              token=cfg['ARVADOS_API_TOKEN'],
211                              insecure=api_is_insecure,
212                              model=OrderedJsonModel())
213     else:
214         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
215     return client
216
217 # Check if git is available
218 def check_git_availability():
219     try:
220         arvados.util.run_command(['git', '--help'])
221     except Exception:
222         abort('git command is not available. Please ensure git is installed.')
223
224
225 def filter_iter(arg):
226     """Iterate a filter string-or-list.
227
228     Pass in a filter field that can either be a string or list.
229     This will iterate elements as if the field had been written as a list.
230     """
231     if isinstance(arg, basestring):
232         return iter((arg,))
233     else:
234         return iter(arg)
235
236 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
237     """Update a single repository filter in-place for the destination.
238
239     If the filter checks that the repository is src_repository, it is
240     updated to check that the repository is dst_repository.  If it does
241     anything else, this function raises ValueError.
242     """
243     if src_repository is None:
244         raise ValueError("component does not specify a source repository")
245     elif dst_repository is None:
246         raise ValueError("no destination repository specified to update repository filter")
247     elif repo_filter[1:] == ['=', src_repository]:
248         repo_filter[2] = dst_repository
249     elif repo_filter[1:] == ['in', [src_repository]]:
250         repo_filter[2] = [dst_repository]
251     else:
252         raise ValueError("repository filter is not a simple source match")
253
254 def migrate_script_version_filter(version_filter):
255     """Update a single script_version filter in-place for the destination.
256
257     Currently this function checks that all the filter operands are Git
258     commit hashes.  If they're not, it raises ValueError to indicate that
259     the filter is not portable.  It could be extended to make other
260     transformations in the future.
261     """
262     if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
263         raise ValueError("script_version filter is not limited to commit hashes")
264
265 def attr_filtered(filter_, *attr_names):
266     """Return True if filter_ applies to any of attr_names, else False."""
267     return any((name == 'any') or (name in attr_names)
268                for name in filter_iter(filter_[0]))
269
270 @contextlib.contextmanager
271 def exception_handler(handler, *exc_types):
272     """If any exc_types are raised in the block, call handler on the exception."""
273     try:
274         yield
275     except exc_types as error:
276         handler(error)
277
278
279 # copy_workflow(wf_uuid, src, dst, args)
280 #
281 #    Copies a workflow identified by wf_uuid from src to dst.
282 #
283 #    If args.recursive is True, also copy any collections
284 #      referenced in the workflow definition yaml.
285 #
286 #    The owner_uuid of the new workflow is set to any given
287 #      project_uuid or the user who copied the template.
288 #
289 #    Returns the copied workflow object.
290 #
291 def copy_workflow(wf_uuid, src, dst, args):
292     # fetch the workflow from the source instance
293     wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
294
295     # copy collections and docker images
296     if args.recursive:
297         wf_def = yaml.safe_load(wf["definition"])
298         if wf_def is not None:
299             locations = []
300             docker_images = {}
301             graph = wf_def.get('$graph', None)
302             if graph is not None:
303                 workflow_collections(graph, locations, docker_images)
304             else:
305                 workflow_collections(wf_def, locations, docker_images)
306
307             if locations:
308                 copy_collections(locations, src, dst, args)
309
310             for image in docker_images:
311                 copy_docker_image(image, docker_images[image], src, dst, args)
312
313     # copy the workflow itself
314     del wf['uuid']
315     wf['owner_uuid'] = args.project_uuid
316
317     existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
318                                              ["name", "=", wf["name"]]]).execute()
319     if len(existing["items"]) == 0:
320         return dst.workflows().create(body=wf).execute(num_retries=args.retries)
321     else:
322         return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
323
324
325 def workflow_collections(obj, locations, docker_images):
326     if isinstance(obj, dict):
327         loc = obj.get('location', None)
328         if loc is not None:
329             if loc.startswith("keep:"):
330                 locations.append(loc[5:])
331
332         docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
333         if docker_image is not None:
334             ds = docker_image.split(":", 1)
335             tag = ds[1] if len(ds)==2 else 'latest'
336             docker_images[ds[0]] = tag
337
338         for x in obj:
339             workflow_collections(obj[x], locations, docker_images)
340     elif isinstance(obj, list):
341         for x in obj:
342             workflow_collections(x, locations, docker_images)
343
344 # copy_collections(obj, src, dst, args)
345 #
346 #    Recursively copies all collections referenced by 'obj' from src
347 #    to dst.  obj may be a dict or a list, in which case we run
348 #    copy_collections on every value it contains. If it is a string,
349 #    search it for any substring that matches a collection hash or uuid
350 #    (this will find hidden references to collections like
351 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
352 #
353 #    Returns a copy of obj with any old collection uuids replaced by
354 #    the new ones.
355 #
356 def copy_collections(obj, src, dst, args):
357
358     def copy_collection_fn(collection_match):
359         """Helper function for regex substitution: copies a single collection,
360         identified by the collection_match MatchObject, to the
361         destination.  Returns the destination collection uuid (or the
362         portable data hash if that's what src_id is).
363
364         """
365         src_id = collection_match.group(0)
366         if src_id not in collections_copied:
367             dst_col = copy_collection(src_id, src, dst, args)
368             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
369                 collections_copied[src_id] = src_id
370             else:
371                 collections_copied[src_id] = dst_col['uuid']
372         return collections_copied[src_id]
373
374     if isinstance(obj, basestring):
375         # Copy any collections identified in this string to dst, replacing
376         # them with the dst uuids as necessary.
377         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
378         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
379         return obj
380     elif isinstance(obj, dict):
381         return type(obj)((v, copy_collections(obj[v], src, dst, args))
382                          for v in obj)
383     elif isinstance(obj, list):
384         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
385     return obj
386
387
388 def total_collection_size(manifest_text):
389     """Return the total number of bytes in this collection (excluding
390     duplicate blocks)."""
391
392     total_bytes = 0
393     locators_seen = {}
394     for line in manifest_text.splitlines():
395         words = line.split()
396         for word in words[1:]:
397             try:
398                 loc = arvados.KeepLocator(word)
399             except ValueError:
400                 continue  # this word isn't a locator, skip it
401             if loc.md5sum not in locators_seen:
402                 locators_seen[loc.md5sum] = True
403                 total_bytes += loc.size
404
405     return total_bytes
406
407 def create_collection_from(c, src, dst, args):
408     """Create a new collection record on dst, and copy Docker metadata if
409     available."""
410
411     collection_uuid = c['uuid']
412     body = {}
413     for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
414         body[d] = c[d]
415
416     if not body["name"]:
417         body['name'] = "copied from " + collection_uuid
418
419     if args.storage_classes:
420         body['storage_classes_desired'] = args.storage_classes
421
422     body['owner_uuid'] = args.project_uuid
423
424     dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
425
426     # Create docker_image_repo+tag and docker_image_hash links
427     # at the destination.
428     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
429         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
430
431         for src_link in docker_links:
432             body = {key: src_link[key]
433                     for key in ['link_class', 'name', 'properties']}
434             body['head_uuid'] = dst_collection['uuid']
435             body['owner_uuid'] = args.project_uuid
436
437             lk = dst.links().create(body=body).execute(num_retries=args.retries)
438             logger.debug('created dst link {}'.format(lk))
439
440     return dst_collection
441
442 # copy_collection(obj_uuid, src, dst, args)
443 #
444 #    Copies the collection identified by obj_uuid from src to dst.
445 #    Returns the collection object created at dst.
446 #
447 #    If args.progress is True, produce a human-friendly progress
448 #    report.
449 #
450 #    If a collection with the desired portable_data_hash already
451 #    exists at dst, and args.force is False, copy_collection returns
452 #    the existing collection without copying any blocks.  Otherwise
453 #    (if no collection exists or if args.force is True)
454 #    copy_collection copies all of the collection data blocks from src
455 #    to dst.
456 #
457 #    For this application, it is critical to preserve the
458 #    collection's manifest hash, which is not guaranteed with the
459 #    arvados.CollectionReader and arvados.CollectionWriter classes.
460 #    Copying each block in the collection manually, followed by
461 #    the manifest block, ensures that the collection's manifest
462 #    hash will not change.
463 #
464 def copy_collection(obj_uuid, src, dst, args):
465     if arvados.util.keep_locator_pattern.match(obj_uuid):
466         # If the obj_uuid is a portable data hash, it might not be
467         # uniquely identified with a particular collection.  As a
468         # result, it is ambiguous as to what name to use for the copy.
469         # Apply some heuristics to pick which collection to get the
470         # name from.
471         srccol = src.collections().list(
472             filters=[['portable_data_hash', '=', obj_uuid]],
473             order="created_at asc"
474             ).execute(num_retries=args.retries)
475
476         items = srccol.get("items")
477
478         if not items:
479             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
480             return
481
482         c = None
483
484         if len(items) == 1:
485             # There's only one collection with the PDH, so use that.
486             c = items[0]
487         if not c:
488             # See if there is a collection that's in the same project
489             # as the root item (usually a workflow) being copied.
490             for i in items:
491                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
492                     c = i
493                     break
494         if not c:
495             # Didn't find any collections located in the same project, so
496             # pick the oldest collection that has a name assigned to it.
497             for i in items:
498                 if i.get("name"):
499                     c = i
500                     break
501         if not c:
502             # None of the collections have names (?!), so just pick the
503             # first one.
504             c = items[0]
505
506         # list() doesn't return manifest text (and we don't want it to,
507         # because we don't need the same maninfest text sent to us 50
508         # times) so go and retrieve the collection object directly
509         # which will include the manifest text.
510         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
511     else:
512         # Assume this is an actual collection uuid, so fetch it directly.
513         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
514
515     # If a collection with this hash already exists at the
516     # destination, and 'force' is not true, just return that
517     # collection.
518     if not args.force:
519         if 'portable_data_hash' in c:
520             colhash = c['portable_data_hash']
521         else:
522             colhash = c['uuid']
523         dstcol = dst.collections().list(
524             filters=[['portable_data_hash', '=', colhash]]
525         ).execute(num_retries=args.retries)
526         if dstcol['items_available'] > 0:
527             for d in dstcol['items']:
528                 if ((args.project_uuid == d['owner_uuid']) and
529                     (c.get('name') == d['name']) and
530                     (c['portable_data_hash'] == d['portable_data_hash'])):
531                     return d
532             c['manifest_text'] = dst.collections().get(
533                 uuid=dstcol['items'][0]['uuid']
534             ).execute(num_retries=args.retries)['manifest_text']
535             return create_collection_from(c, src, dst, args)
536
537     # Fetch the collection's manifest.
538     manifest = c['manifest_text']
539     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
540
541     # Copy each block from src_keep to dst_keep.
542     # Use the newly signed locators returned from dst_keep to build
543     # a new manifest as we go.
544     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
545     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
546     dst_manifest = io.StringIO()
547     dst_locators = {}
548     bytes_written = 0
549     bytes_expected = total_collection_size(manifest)
550     if args.progress:
551         progress_writer = ProgressWriter(human_progress)
552     else:
553         progress_writer = None
554
555     for line in manifest.splitlines():
556         words = line.split()
557         dst_manifest.write(words[0])
558         for word in words[1:]:
559             try:
560                 loc = arvados.KeepLocator(word)
561             except ValueError:
562                 # If 'word' can't be parsed as a locator,
563                 # presume it's a filename.
564                 dst_manifest.write(' ')
565                 dst_manifest.write(word)
566                 continue
567             blockhash = loc.md5sum
568             # copy this block if we haven't seen it before
569             # (otherwise, just reuse the existing dst_locator)
570             if blockhash not in dst_locators:
571                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
572                 if progress_writer:
573                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
574                 data = src_keep.get(word)
575                 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
576                 dst_locators[blockhash] = dst_locator
577                 bytes_written += loc.size
578             dst_manifest.write(' ')
579             dst_manifest.write(dst_locators[blockhash])
580         dst_manifest.write("\n")
581
582     if progress_writer:
583         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
584         progress_writer.finish()
585
586     # Copy the manifest and save the collection.
587     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
588
589     c['manifest_text'] = dst_manifest.getvalue()
590     return create_collection_from(c, src, dst, args)
591
592 def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
593     r = api.repositories().list(
594         filters=[['name', '=', repo_name]]).execute(num_retries=retries)
595     if r['items_available'] != 1:
596         raise Exception('cannot identify repo {}; {} repos found'
597                         .format(repo_name, r['items_available']))
598
599     https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
600     http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
601     other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
602
603     priority = https_url + other_url + http_url
604
605     git_config = []
606     git_url = None
607     for url in priority:
608         if url.startswith("http"):
609             u = urllib.parse.urlsplit(url)
610             baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
611             git_config = ["-c", "credential.%s/.username=none" % baseurl,
612                           "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
613         else:
614             git_config = []
615
616         try:
617             logger.debug("trying %s", url)
618             arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
619                                       env={"HOME": os.environ["HOME"],
620                                            "ARVADOS_API_TOKEN": api.api_token,
621                                            "GIT_ASKPASS": "/bin/false"})
622         except arvados.errors.CommandFailedError:
623             pass
624         else:
625             git_url = url
626             break
627
628     if not git_url:
629         raise Exception('Cannot access git repository, tried {}'
630                         .format(priority))
631
632     if git_url.startswith("http:"):
633         if allow_insecure_http:
634             logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
635         else:
636             raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
637
638     return (git_url, git_config)
639
640
641 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
642     """Copy the docker image identified by docker_image and
643     docker_image_tag from src to dst. Create appropriate
644     docker_image_repo+tag and docker_image_hash links at dst.
645
646     """
647
648     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
649
650     # Find the link identifying this docker image.
651     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
652         src, args.retries, docker_image, docker_image_tag)
653     if docker_image_list:
654         image_uuid, image_info = docker_image_list[0]
655         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
656
657         # Copy the collection it refers to.
658         dst_image_col = copy_collection(image_uuid, src, dst, args)
659     elif arvados.util.keep_locator_pattern.match(docker_image):
660         dst_image_col = copy_collection(docker_image, src, dst, args)
661     else:
662         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
663
664 def copy_project(obj_uuid, src, dst, owner_uuid, args):
665
666     src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
667
668     # Create/update the destination project
669     existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
670                                           ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
671     if len(existing["items"]) == 0:
672         project_record = dst.groups().create(body={"group": {"group_class": "project",
673                                                              "owner_uuid": owner_uuid,
674                                                              "name": src_project_record["name"]}}).execute(num_retries=args.retries)
675     else:
676         project_record = existing["items"][0]
677
678     dst.groups().update(uuid=project_record["uuid"],
679                         body={"group": {
680                             "description": src_project_record["description"]}}).execute(num_retries=args.retries)
681
682     args.project_uuid = project_record["uuid"]
683
684     logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
685
686     # Copy collections
687     copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
688                      src, dst, args)
689
690     # Copy workflows
691     for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
692         copy_workflow(w["uuid"], src, dst, args)
693
694     if args.recursive:
695         for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
696             copy_project(g["uuid"], src, dst, project_record["uuid"], args)
697
698     return project_record
699
700 # git_rev_parse(rev, repo)
701 #
702 #    Returns the 40-character commit hash corresponding to 'rev' in
703 #    git repository 'repo' (which must be the path of a local git
704 #    repository)
705 #
706 def git_rev_parse(rev, repo):
707     gitout, giterr = arvados.util.run_command(
708         ['git', 'rev-parse', rev], cwd=repo)
709     return gitout.strip()
710
711 # uuid_type(api, object_uuid)
712 #
713 #    Returns the name of the class that object_uuid belongs to, based on
714 #    the second field of the uuid.  This function consults the api's
715 #    schema to identify the object class.
716 #
717 #    It returns a string such as 'Collection', 'Workflow', etc.
718 #
719 #    Special case: if handed a Keep locator hash, return 'Collection'.
720 #
721 def uuid_type(api, object_uuid):
722     if re.match(arvados.util.keep_locator_pattern, object_uuid):
723         return 'Collection'
724     p = object_uuid.split('-')
725     if len(p) == 3:
726         type_prefix = p[1]
727         for k in api._schema.schemas:
728             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
729             if type_prefix == obj_class:
730                 return k
731     return None
732
733 def abort(msg, code=1):
734     logger.info("arv-copy: %s", msg)
735     exit(code)
736
737
738 # Code for reporting on the progress of a collection upload.
739 # Stolen from arvados.commands.put.ArvPutCollectionWriter
740 # TODO(twp): figure out how to refactor into a shared library
741 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
742 # code)
743
744 def machine_progress(obj_uuid, bytes_written, bytes_expected):
745     return "{} {}: {} {} written {} total\n".format(
746         sys.argv[0],
747         os.getpid(),
748         obj_uuid,
749         bytes_written,
750         -1 if (bytes_expected is None) else bytes_expected)
751
752 def human_progress(obj_uuid, bytes_written, bytes_expected):
753     if bytes_expected:
754         return "\r{}: {}M / {}M {:.1%} ".format(
755             obj_uuid,
756             bytes_written >> 20, bytes_expected >> 20,
757             float(bytes_written) / bytes_expected)
758     else:
759         return "\r{}: {} ".format(obj_uuid, bytes_written)
760
761 class ProgressWriter(object):
762     _progress_func = None
763     outfile = sys.stderr
764
765     def __init__(self, progress_func):
766         self._progress_func = progress_func
767
768     def report(self, obj_uuid, bytes_written, bytes_expected):
769         if self._progress_func is not None:
770             self.outfile.write(
771                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
772
773     def finish(self):
774         self.outfile.write("\n")
775
776 if __name__ == '__main__':
777     main()