Merge branch 'master' into 3408-production-datamanager
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 from arvados.api import OrderedJsonModel
36
37 logger = logging.getLogger('arvados.arv-copy')
38
39 # local_repo_dir records which git repositories from the Arvados source
40 # instance have been checked out locally during this run, and to which
41 # directories.
42 # e.g. if repository 'twp' from src_arv has been cloned into
43 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
44 #
45 local_repo_dir = {}
46
47 # List of collections that have been copied in this session, and their
48 # destination collection UUIDs.
49 collections_copied = {}
50
51 # Set of (repository, script_version) two-tuples of commits copied in git.
52 scripts_copied = set()
53
54 # The owner_uuid of the object being copied
55 src_owner_uuid = None
56
57 def main():
58     copy_opts = argparse.ArgumentParser(add_help=False)
59
60     copy_opts.add_argument(
61         '-v', '--verbose', dest='verbose', action='store_true',
62         help='Verbose output.')
63     copy_opts.add_argument(
64         '--progress', dest='progress', action='store_true',
65         help='Report progress on copying collections. (default)')
66     copy_opts.add_argument(
67         '--no-progress', dest='progress', action='store_false',
68         help='Do not report progress on copying collections.')
69     copy_opts.add_argument(
70         '-f', '--force', dest='force', action='store_true',
71         help='Perform copy even if the object appears to exist at the remote destination.')
72     copy_opts.add_argument(
73         '--src', dest='source_arvados', required=True,
74         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
75     copy_opts.add_argument(
76         '--dst', dest='destination_arvados', required=True,
77         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
78     copy_opts.add_argument(
79         '--recursive', dest='recursive', action='store_true',
80         help='Recursively copy any dependencies for this object. (default)')
81     copy_opts.add_argument(
82         '--no-recursive', dest='recursive', action='store_false',
83         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
84     copy_opts.add_argument(
85         '--dst-git-repo', dest='dst_git_repo',
86         help='The name of the destination git repository. Required when copying a pipeline recursively.')
87     copy_opts.add_argument(
88         '--project-uuid', dest='project_uuid',
89         help='The UUID of the project at the destination to which the pipeline should be copied.')
90     copy_opts.add_argument(
91         'object_uuid',
92         help='The UUID of the object to be copied.')
93     copy_opts.set_defaults(progress=True)
94     copy_opts.set_defaults(recursive=True)
95
96     parser = argparse.ArgumentParser(
97         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
98         parents=[copy_opts, arv_cmd.retry_opt])
99     args = parser.parse_args()
100
101     if args.verbose:
102         logger.setLevel(logging.DEBUG)
103     else:
104         logger.setLevel(logging.INFO)
105
106     # Create API clients for the source and destination instances
107     src_arv = api_for_instance(args.source_arvados)
108     dst_arv = api_for_instance(args.destination_arvados)
109
110     if not args.project_uuid:
111         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
112
113     # Identify the kind of object we have been given, and begin copying.
114     t = uuid_type(src_arv, args.object_uuid)
115     if t == 'Collection':
116         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
117         result = copy_collection(args.object_uuid,
118                                  src_arv, dst_arv,
119                                  args)
120     elif t == 'PipelineInstance':
121         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
122         result = copy_pipeline_instance(args.object_uuid,
123                                         src_arv, dst_arv,
124                                         args)
125     elif t == 'PipelineTemplate':
126         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
127         result = copy_pipeline_template(args.object_uuid,
128                                         src_arv, dst_arv, args)
129     else:
130         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
131
132     # Clean up any outstanding temp git repositories.
133     for d in local_repo_dir.values():
134         shutil.rmtree(d, ignore_errors=True)
135
136     # If no exception was thrown and the response does not have an
137     # error_token field, presume success
138     if 'error_token' in result or 'uuid' not in result:
139         logger.error("API server returned an error result: {}".format(result))
140         exit(1)
141
142     logger.info("")
143     logger.info("Success: created copy with uuid {}".format(result['uuid']))
144     exit(0)
145
146 def set_src_owner_uuid(resource, uuid, args):
147     global src_owner_uuid
148     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
149     src_owner_uuid = c.get("owner_uuid")
150
151 # api_for_instance(instance_name)
152 #
153 #     Creates an API client for the Arvados instance identified by
154 #     instance_name.
155 #
156 #     If instance_name contains a slash, it is presumed to be a path
157 #     (either local or absolute) to a file with Arvados configuration
158 #     settings.
159 #
160 #     Otherwise, it is presumed to be the name of a file in
161 #     $HOME/.config/arvados/instance_name.conf
162 #
163 def api_for_instance(instance_name):
164     if '/' in instance_name:
165         config_file = instance_name
166     else:
167         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
168
169     try:
170         cfg = arvados.config.load(config_file)
171     except (IOError, OSError) as e:
172         abort(("Could not open config file {}: {}\n" +
173                "You must make sure that your configuration tokens\n" +
174                "for Arvados instance {} are in {} and that this\n" +
175                "file is readable.").format(
176                    config_file, e, instance_name, config_file))
177
178     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
179         api_is_insecure = (
180             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
181                 ['1', 't', 'true', 'y', 'yes']))
182         client = arvados.api('v1',
183                              host=cfg['ARVADOS_API_HOST'],
184                              token=cfg['ARVADOS_API_TOKEN'],
185                              insecure=api_is_insecure,
186                              model=OrderedJsonModel())
187     else:
188         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
189     return client
190
191 # copy_pipeline_instance(pi_uuid, src, dst, args)
192 #
193 #    Copies a pipeline instance identified by pi_uuid from src to dst.
194 #
195 #    If the args.recursive option is set:
196 #      1. Copies all input collections
197 #           * For each component in the pipeline, include all collections
198 #             listed as job dependencies for that component)
199 #      2. Copy docker images
200 #      3. Copy git repositories
201 #      4. Copy the pipeline template
202 #
203 #    The only changes made to the copied pipeline instance are:
204 #      1. The original pipeline instance UUID is preserved in
205 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
206 #      2. The pipeline_template_uuid is changed to the new template uuid.
207 #      3. The owner_uuid of the instance is changed to the user who
208 #         copied it.
209 #
210 def copy_pipeline_instance(pi_uuid, src, dst, args):
211     # Fetch the pipeline instance record.
212     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
213
214     if args.recursive:
215         if not args.dst_git_repo:
216             abort('--dst-git-repo is required when copying a pipeline recursively.')
217         # Copy the pipeline template and save the copied template.
218         if pi.get('pipeline_template_uuid', None):
219             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
220                                         src, dst, args)
221
222         # Copy input collections, docker images and git repos.
223         pi = copy_collections(pi, src, dst, args)
224         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
225         copy_docker_images(pi, src, dst, args)
226
227         # Update the fields of the pipeline instance with the copied
228         # pipeline template.
229         if pi.get('pipeline_template_uuid', None):
230             pi['pipeline_template_uuid'] = pt['uuid']
231
232     else:
233         # not recursive
234         logger.info("Copying only pipeline instance %s.", pi_uuid)
235         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
236
237     # Update the pipeline instance properties, and create the new
238     # instance at dst.
239     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
240     pi['description'] = "Pipeline copied from {}\n\n{}".format(
241         pi_uuid,
242         pi['description'] if pi.get('description', None) else '')
243
244     pi['owner_uuid'] = args.project_uuid
245
246     del pi['uuid']
247
248     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
249     return new_pi
250
251 # copy_pipeline_template(pt_uuid, src, dst, args)
252 #
253 #    Copies a pipeline template identified by pt_uuid from src to dst.
254 #
255 #    If args.recursive is True, also copy any collections, docker
256 #    images and git repositories that this template references.
257 #
258 #    The owner_uuid of the new template is changed to that of the user
259 #    who copied the template.
260 #
261 #    Returns the copied pipeline template object.
262 #
263 def copy_pipeline_template(pt_uuid, src, dst, args):
264     # fetch the pipeline template from the source instance
265     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
266
267     if args.recursive:
268         if not args.dst_git_repo:
269             abort('--dst-git-repo is required when copying a pipeline recursively.')
270         # Copy input collections, docker images and git repos.
271         pt = copy_collections(pt, src, dst, args)
272         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
273         copy_docker_images(pt, src, dst, args)
274
275     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
276         pt_uuid,
277         pt['description'] if pt.get('description', None) else '')
278     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
279     del pt['uuid']
280
281     pt['owner_uuid'] = args.project_uuid
282
283     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
284
285 # copy_collections(obj, src, dst, args)
286 #
287 #    Recursively copies all collections referenced by 'obj' from src
288 #    to dst.  obj may be a dict or a list, in which case we run
289 #    copy_collections on every value it contains. If it is a string,
290 #    search it for any substring that matches a collection hash or uuid
291 #    (this will find hidden references to collections like
292 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
293 #
294 #    Returns a copy of obj with any old collection uuids replaced by
295 #    the new ones.
296 #
297 def copy_collections(obj, src, dst, args):
298
299     def copy_collection_fn(collection_match):
300         """Helper function for regex substitution: copies a single collection,
301         identified by the collection_match MatchObject, to the
302         destination.  Returns the destination collection uuid (or the
303         portable data hash if that's what src_id is).
304
305         """
306         src_id = collection_match.group(0)
307         if src_id not in collections_copied:
308             dst_col = copy_collection(src_id, src, dst, args)
309             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
310                 collections_copied[src_id] = src_id
311             else:
312                 collections_copied[src_id] = dst_col['uuid']
313         return collections_copied[src_id]
314
315     if isinstance(obj, basestring):
316         # Copy any collections identified in this string to dst, replacing
317         # them with the dst uuids as necessary.
318         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
319         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
320         return obj
321     elif type(obj) == dict:
322         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
323     elif type(obj) == list:
324         return [copy_collections(v, src, dst, args) for v in obj]
325     return obj
326
327 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
328     """Copy a job's script to the destination repository, and update its record.
329
330     Given a jobspec dictionary, this function finds the referenced script from
331     src and copies it to dst and dst_repo.  It also updates jobspec in place to
332     refer to names on the destination.
333     """
334     repo = jobspec.get('repository')
335     if repo is None:
336         return
337     # script_version is the "script_version" parameter from the source
338     # component or job.  If no script_version was supplied in the
339     # component or job, it is a mistake in the pipeline, but for the
340     # purposes of copying the repository, default to "master".
341     script_version = jobspec.get('script_version') or 'master'
342     script_key = (repo, script_version)
343     if script_key not in scripts_copied:
344         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
345         scripts_copied.add(script_key)
346     jobspec['repository'] = dst_repo
347     repo_dir = local_repo_dir[repo]
348     for version_key in ['script_version', 'supplied_script_version']:
349         if version_key in jobspec:
350             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
351
352 # copy_git_repos(p, src, dst, dst_repo, args)
353 #
354 #    Copies all git repositories referenced by pipeline instance or
355 #    template 'p' from src to dst.
356 #
357 #    For each component c in the pipeline:
358 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
359 #      * Rename script versions:
360 #          * c['script_version']
361 #          * c['job']['script_version']
362 #          * c['job']['supplied_script_version']
363 #        to the commit hashes they resolve to, since any symbolic
364 #        names (tags, branches) are not preserved in the destination repo.
365 #
366 #    The pipeline object is updated in place with the new repository
367 #    names.  The return value is undefined.
368 #
369 def copy_git_repos(p, src, dst, dst_repo, args):
370     for component in p['components'].itervalues():
371         migrate_jobspec(component, src, dst, dst_repo, args)
372         if 'job' in component:
373             migrate_jobspec(component['job'], src, dst, dst_repo, args)
374
375 def total_collection_size(manifest_text):
376     """Return the total number of bytes in this collection (excluding
377     duplicate blocks)."""
378
379     total_bytes = 0
380     locators_seen = {}
381     for line in manifest_text.splitlines():
382         words = line.split()
383         for word in words[1:]:
384             try:
385                 loc = arvados.KeepLocator(word)
386             except ValueError:
387                 continue  # this word isn't a locator, skip it
388             if loc.md5sum not in locators_seen:
389                 locators_seen[loc.md5sum] = True
390                 total_bytes += loc.size
391
392     return total_bytes
393
394 def create_collection_from(c, src, dst, args):
395     """Create a new collection record on dst, and copy Docker metadata if
396     available."""
397
398     collection_uuid = c['uuid']
399     del c['uuid']
400
401     if not c["name"]:
402         c['name'] = "copied from " + collection_uuid
403
404     if 'properties' in c:
405         del c['properties']
406
407     c['owner_uuid'] = args.project_uuid
408
409     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
410
411     # Create docker_image_repo+tag and docker_image_hash links
412     # at the destination.
413     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
414         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
415
416         for src_link in docker_links:
417             body = {key: src_link[key]
418                     for key in ['link_class', 'name', 'properties']}
419             body['head_uuid'] = dst_collection['uuid']
420             body['owner_uuid'] = args.project_uuid
421
422             lk = dst.links().create(body=body).execute(num_retries=args.retries)
423             logger.debug('created dst link {}'.format(lk))
424
425     return dst_collection
426
427 # copy_collection(obj_uuid, src, dst, args)
428 #
429 #    Copies the collection identified by obj_uuid from src to dst.
430 #    Returns the collection object created at dst.
431 #
432 #    If args.progress is True, produce a human-friendly progress
433 #    report.
434 #
435 #    If a collection with the desired portable_data_hash already
436 #    exists at dst, and args.force is False, copy_collection returns
437 #    the existing collection without copying any blocks.  Otherwise
438 #    (if no collection exists or if args.force is True)
439 #    copy_collection copies all of the collection data blocks from src
440 #    to dst.
441 #
442 #    For this application, it is critical to preserve the
443 #    collection's manifest hash, which is not guaranteed with the
444 #    arvados.CollectionReader and arvados.CollectionWriter classes.
445 #    Copying each block in the collection manually, followed by
446 #    the manifest block, ensures that the collection's manifest
447 #    hash will not change.
448 #
449 def copy_collection(obj_uuid, src, dst, args):
450     if arvados.util.keep_locator_pattern.match(obj_uuid):
451         # If the obj_uuid is a portable data hash, it might not be uniquely
452         # identified with a particular collection.  As a result, it is
453         # ambigious as to what name to use for the copy.  Apply some heuristics
454         # to pick which collection to get the name from.
455         srccol = src.collections().list(
456             filters=[['portable_data_hash', '=', obj_uuid]],
457             order="created_at asc"
458             ).execute(num_retries=args.retries)
459
460         items = srccol.get("items")
461
462         if not items:
463             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
464             return
465
466         c = None
467
468         if len(items) == 1:
469             # There's only one collection with the PDH, so use that.
470             c = items[0]
471         if not c:
472             # See if there is a collection that's in the same project
473             # as the root item (usually a pipeline) being copied.
474             for i in items:
475                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
476                     c = i
477                     break
478         if not c:
479             # Didn't find any collections located in the same project, so
480             # pick the oldest collection that has a name assigned to it.
481             for i in items:
482                 if i.get("name"):
483                     c = i
484                     break
485         if not c:
486             # None of the collections have names (?!), so just pick the
487             # first one.
488             c = items[0]
489
490         # list() doesn't return manifest text (and we don't want it to,
491         # because we don't need the same maninfest text sent to us 50
492         # times) so go and retrieve the collection object directly
493         # which will include the manifest text.
494         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
495     else:
496         # Assume this is an actual collection uuid, so fetch it directly.
497         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
498
499     # If a collection with this hash already exists at the
500     # destination, and 'force' is not true, just return that
501     # collection.
502     if not args.force:
503         if 'portable_data_hash' in c:
504             colhash = c['portable_data_hash']
505         else:
506             colhash = c['uuid']
507         dstcol = dst.collections().list(
508             filters=[['portable_data_hash', '=', colhash]]
509         ).execute(num_retries=args.retries)
510         if dstcol['items_available'] > 0:
511             for d in dstcol['items']:
512                 if ((args.project_uuid == d['owner_uuid']) and
513                     (c.get('name') == d['name']) and
514                     (c['portable_data_hash'] == d['portable_data_hash'])):
515                     return d
516             c['manifest_text'] = dst.collections().get(
517                 uuid=dstcol['items'][0]['uuid']
518             ).execute(num_retries=args.retries)['manifest_text']
519             return create_collection_from(c, src, dst, args)
520
521     # Fetch the collection's manifest.
522     manifest = c['manifest_text']
523     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
524
525     # Copy each block from src_keep to dst_keep.
526     # Use the newly signed locators returned from dst_keep to build
527     # a new manifest as we go.
528     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
529     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
530     dst_manifest = ""
531     dst_locators = {}
532     bytes_written = 0
533     bytes_expected = total_collection_size(manifest)
534     if args.progress:
535         progress_writer = ProgressWriter(human_progress)
536     else:
537         progress_writer = None
538
539     for line in manifest.splitlines(True):
540         words = line.split()
541         dst_manifest_line = words[0]
542         for word in words[1:]:
543             try:
544                 loc = arvados.KeepLocator(word)
545                 blockhash = loc.md5sum
546                 # copy this block if we haven't seen it before
547                 # (otherwise, just reuse the existing dst_locator)
548                 if blockhash not in dst_locators:
549                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
550                     if progress_writer:
551                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
552                     data = src_keep.get(word)
553                     dst_locator = dst_keep.put(data)
554                     dst_locators[blockhash] = dst_locator
555                     bytes_written += loc.size
556                 dst_manifest_line += ' ' + dst_locators[blockhash]
557             except ValueError:
558                 # If 'word' can't be parsed as a locator,
559                 # presume it's a filename.
560                 dst_manifest_line += ' ' + word
561         dst_manifest += dst_manifest_line
562         if line.endswith("\n"):
563             dst_manifest += "\n"
564
565     if progress_writer:
566         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
567         progress_writer.finish()
568
569     # Copy the manifest and save the collection.
570     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
571
572     dst_keep.put(dst_manifest.encode('utf-8'))
573     c['manifest_text'] = dst_manifest
574     return create_collection_from(c, src, dst, args)
575
576 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
577 #
578 #    Copies commits from git repository 'src_git_repo' on Arvados
579 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
580 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
581 #    or "jsmith")
582 #
583 #    All commits will be copied to a destination branch named for the
584 #    source repository URL.
585 #
586 #    The destination repository must already exist.
587 #
588 #    The user running this command must be authenticated
589 #    to both repositories.
590 #
591 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
592     # Identify the fetch and push URLs for the git repositories.
593     r = src.repositories().list(
594         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
595     if r['items_available'] != 1:
596         raise Exception('cannot identify source repo {}; {} repos found'
597                         .format(src_git_repo, r['items_available']))
598     src_git_url = r['items'][0]['fetch_url']
599     logger.debug('src_git_url: {}'.format(src_git_url))
600
601     r = dst.repositories().list(
602         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
603     if r['items_available'] != 1:
604         raise Exception('cannot identify destination repo {}; {} repos found'
605                         .format(dst_git_repo, r['items_available']))
606     dst_git_push_url  = r['items'][0]['push_url']
607     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
608
609     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
610
611     # Copy git commits from src repo to dst repo.
612     if src_git_repo not in local_repo_dir:
613         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
614         arvados.util.run_command(
615             ["git", "clone", "--bare", src_git_url,
616              local_repo_dir[src_git_repo]],
617             cwd=os.path.dirname(local_repo_dir[src_git_repo]))
618         arvados.util.run_command(
619             ["git", "remote", "add", "dst", dst_git_push_url],
620             cwd=local_repo_dir[src_git_repo])
621     arvados.util.run_command(
622         ["git", "branch", dst_branch, script_version],
623         cwd=local_repo_dir[src_git_repo])
624     arvados.util.run_command(["git", "push", "dst", dst_branch],
625                              cwd=local_repo_dir[src_git_repo])
626
627 def copy_docker_images(pipeline, src, dst, args):
628     """Copy any docker images named in the pipeline components'
629     runtime_constraints field from src to dst."""
630
631     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
632     for c_name, c_info in pipeline['components'].iteritems():
633         if ('runtime_constraints' in c_info and
634             'docker_image' in c_info['runtime_constraints']):
635             copy_docker_image(
636                 c_info['runtime_constraints']['docker_image'],
637                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
638                 src, dst, args)
639
640
641 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
642     """Copy the docker image identified by docker_image and
643     docker_image_tag from src to dst. Create appropriate
644     docker_image_repo+tag and docker_image_hash links at dst.
645
646     """
647
648     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
649
650     # Find the link identifying this docker image.
651     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
652         src, args.retries, docker_image, docker_image_tag)
653     if docker_image_list:
654         image_uuid, image_info = docker_image_list[0]
655         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
656
657         # Copy the collection it refers to.
658         dst_image_col = copy_collection(image_uuid, src, dst, args)
659     elif arvados.util.keep_locator_pattern.match(docker_image):
660         dst_image_col = copy_collection(docker_image, src, dst, args)
661     else:
662         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
663
664 # git_rev_parse(rev, repo)
665 #
666 #    Returns the 40-character commit hash corresponding to 'rev' in
667 #    git repository 'repo' (which must be the path of a local git
668 #    repository)
669 #
670 def git_rev_parse(rev, repo):
671     gitout, giterr = arvados.util.run_command(
672         ['git', 'rev-parse', rev], cwd=repo)
673     return gitout.strip()
674
675 # uuid_type(api, object_uuid)
676 #
677 #    Returns the name of the class that object_uuid belongs to, based on
678 #    the second field of the uuid.  This function consults the api's
679 #    schema to identify the object class.
680 #
681 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
682 #
683 #    Special case: if handed a Keep locator hash, return 'Collection'.
684 #
685 def uuid_type(api, object_uuid):
686     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
687         return 'Collection'
688     p = object_uuid.split('-')
689     if len(p) == 3:
690         type_prefix = p[1]
691         for k in api._schema.schemas:
692             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
693             if type_prefix == obj_class:
694                 return k
695     return None
696
697 def abort(msg, code=1):
698     logger.info("arv-copy: %s", msg)
699     exit(code)
700
701
702 # Code for reporting on the progress of a collection upload.
703 # Stolen from arvados.commands.put.ArvPutCollectionWriter
704 # TODO(twp): figure out how to refactor into a shared library
705 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
706 # code)
707
708 def machine_progress(obj_uuid, bytes_written, bytes_expected):
709     return "{} {}: {} {} written {} total\n".format(
710         sys.argv[0],
711         os.getpid(),
712         obj_uuid,
713         bytes_written,
714         -1 if (bytes_expected is None) else bytes_expected)
715
716 def human_progress(obj_uuid, bytes_written, bytes_expected):
717     if bytes_expected:
718         return "\r{}: {}M / {}M {:.1%} ".format(
719             obj_uuid,
720             bytes_written >> 20, bytes_expected >> 20,
721             float(bytes_written) / bytes_expected)
722     else:
723         return "\r{}: {} ".format(obj_uuid, bytes_written)
724
725 class ProgressWriter(object):
726     _progress_func = None
727     outfile = sys.stderr
728
729     def __init__(self, progress_func):
730         self._progress_func = progress_func
731
732     def report(self, obj_uuid, bytes_written, bytes_expected):
733         if self._progress_func is not None:
734             self.outfile.write(
735                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
736
737     def finish(self):
738         self.outfile.write("\n")
739
740 if __name__ == '__main__':
741     main()