7225: ArvadosFile.flush() commits all underlying blocks.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 from arvados.api import OrderedJsonModel
36
37 logger = logging.getLogger('arvados.arv-copy')
38
39 # local_repo_dir records which git repositories from the Arvados source
40 # instance have been checked out locally during this run, and to which
41 # directories.
42 # e.g. if repository 'twp' from src_arv has been cloned into
43 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
44 #
45 local_repo_dir = {}
46
47 # List of collections that have been copied in this session, and their
48 # destination collection UUIDs.
49 collections_copied = {}
50
51 # Set of (repository, script_version) two-tuples of commits copied in git.
52 scripts_copied = set()
53
54 # The owner_uuid of the object being copied
55 src_owner_uuid = None
56
57 def main():
58     copy_opts = argparse.ArgumentParser(add_help=False)
59
60     copy_opts.add_argument(
61         '-v', '--verbose', dest='verbose', action='store_true',
62         help='Verbose output.')
63     copy_opts.add_argument(
64         '--progress', dest='progress', action='store_true',
65         help='Report progress on copying collections. (default)')
66     copy_opts.add_argument(
67         '--no-progress', dest='progress', action='store_false',
68         help='Do not report progress on copying collections.')
69     copy_opts.add_argument(
70         '-f', '--force', dest='force', action='store_true',
71         help='Perform copy even if the object appears to exist at the remote destination.')
72     copy_opts.add_argument(
73         '--src', dest='source_arvados', required=True,
74         help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
75     copy_opts.add_argument(
76         '--dst', dest='destination_arvados', required=True,
77         help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
78     copy_opts.add_argument(
79         '--recursive', dest='recursive', action='store_true',
80         help='Recursively copy any dependencies for this object. (default)')
81     copy_opts.add_argument(
82         '--no-recursive', dest='recursive', action='store_false',
83         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
84     copy_opts.add_argument(
85         '--dst-git-repo', dest='dst_git_repo',
86         help='The name of the destination git repository. Required when copying a pipeline recursively.')
87     copy_opts.add_argument(
88         '--project-uuid', dest='project_uuid',
89         help='The UUID of the project at the destination to which the pipeline should be copied.')
90     copy_opts.add_argument(
91         'object_uuid',
92         help='The UUID of the object to be copied.')
93     copy_opts.set_defaults(progress=True)
94     copy_opts.set_defaults(recursive=True)
95
96     parser = argparse.ArgumentParser(
97         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
98         parents=[copy_opts, arv_cmd.retry_opt])
99     args = parser.parse_args()
100
101     if args.verbose:
102         logger.setLevel(logging.DEBUG)
103     else:
104         logger.setLevel(logging.INFO)
105
106     # Create API clients for the source and destination instances
107     src_arv = api_for_instance(args.source_arvados)
108     dst_arv = api_for_instance(args.destination_arvados)
109
110     if not args.project_uuid:
111         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
112
113     # Identify the kind of object we have been given, and begin copying.
114     t = uuid_type(src_arv, args.object_uuid)
115     if t == 'Collection':
116         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
117         result = copy_collection(args.object_uuid,
118                                  src_arv, dst_arv,
119                                  args)
120     elif t == 'PipelineInstance':
121         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
122         result = copy_pipeline_instance(args.object_uuid,
123                                         src_arv, dst_arv,
124                                         args)
125     elif t == 'PipelineTemplate':
126         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
127         result = copy_pipeline_template(args.object_uuid,
128                                         src_arv, dst_arv, args)
129     else:
130         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
131
132     # Clean up any outstanding temp git repositories.
133     for d in local_repo_dir.values():
134         shutil.rmtree(d, ignore_errors=True)
135
136     # If no exception was thrown and the response does not have an
137     # error_token field, presume success
138     if 'error_token' in result or 'uuid' not in result:
139         logger.error("API server returned an error result: {}".format(result))
140         exit(1)
141
142     logger.info("")
143     logger.info("Success: created copy with uuid {}".format(result['uuid']))
144     exit(0)
145
146 def set_src_owner_uuid(resource, uuid, args):
147     global src_owner_uuid
148     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
149     src_owner_uuid = c.get("owner_uuid")
150
151 # api_for_instance(instance_name)
152 #
153 #     Creates an API client for the Arvados instance identified by
154 #     instance_name.
155 #
156 #     If instance_name contains a slash, it is presumed to be a path
157 #     (either local or absolute) to a file with Arvados configuration
158 #     settings.
159 #
160 #     Otherwise, it is presumed to be the name of a file in
161 #     $HOME/.config/arvados/instance_name.conf
162 #
163 def api_for_instance(instance_name):
164     if '/' in instance_name:
165         config_file = instance_name
166     else:
167         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
168
169     try:
170         cfg = arvados.config.load(config_file)
171     except (IOError, OSError) as e:
172         abort(("Could not open config file {}: {}\n" +
173                "You must make sure that your configuration tokens\n" +
174                "for Arvados instance {} are in {} and that this\n" +
175                "file is readable.").format(
176                    config_file, e, instance_name, config_file))
177
178     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
179         api_is_insecure = (
180             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
181                 ['1', 't', 'true', 'y', 'yes']))
182         client = arvados.api('v1',
183                              host=cfg['ARVADOS_API_HOST'],
184                              token=cfg['ARVADOS_API_TOKEN'],
185                              insecure=api_is_insecure,
186                              model=OrderedJsonModel())
187     else:
188         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
189     return client
190
191 # Check if git is available
192 def check_git_availability():
193     try:
194         arvados.util.run_command(['git', '--help'])
195     except Exception:
196         abort('git command is not available. Please ensure git is installed.')
197
198 # copy_pipeline_instance(pi_uuid, src, dst, args)
199 #
200 #    Copies a pipeline instance identified by pi_uuid from src to dst.
201 #
202 #    If the args.recursive option is set:
203 #      1. Copies all input collections
204 #           * For each component in the pipeline, include all collections
205 #             listed as job dependencies for that component)
206 #      2. Copy docker images
207 #      3. Copy git repositories
208 #      4. Copy the pipeline template
209 #
210 #    The only changes made to the copied pipeline instance are:
211 #      1. The original pipeline instance UUID is preserved in
212 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
213 #      2. The pipeline_template_uuid is changed to the new template uuid.
214 #      3. The owner_uuid of the instance is changed to the user who
215 #         copied it.
216 #
217 def copy_pipeline_instance(pi_uuid, src, dst, args):
218     # Fetch the pipeline instance record.
219     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
220
221     if args.recursive:
222         check_git_availability()
223
224         if not args.dst_git_repo:
225             abort('--dst-git-repo is required when copying a pipeline recursively.')
226         # Copy the pipeline template and save the copied template.
227         if pi.get('pipeline_template_uuid', None):
228             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
229                                         src, dst, args)
230
231         # Copy input collections, docker images and git repos.
232         pi = copy_collections(pi, src, dst, args)
233         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
234         copy_docker_images(pi, src, dst, args)
235
236         # Update the fields of the pipeline instance with the copied
237         # pipeline template.
238         if pi.get('pipeline_template_uuid', None):
239             pi['pipeline_template_uuid'] = pt['uuid']
240
241     else:
242         # not recursive
243         logger.info("Copying only pipeline instance %s.", pi_uuid)
244         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
245
246     # Update the pipeline instance properties, and create the new
247     # instance at dst.
248     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
249     pi['description'] = "Pipeline copied from {}\n\n{}".format(
250         pi_uuid,
251         pi['description'] if pi.get('description', None) else '')
252
253     pi['owner_uuid'] = args.project_uuid
254
255     del pi['uuid']
256
257     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
258     return new_pi
259
260 # copy_pipeline_template(pt_uuid, src, dst, args)
261 #
262 #    Copies a pipeline template identified by pt_uuid from src to dst.
263 #
264 #    If args.recursive is True, also copy any collections, docker
265 #    images and git repositories that this template references.
266 #
267 #    The owner_uuid of the new template is changed to that of the user
268 #    who copied the template.
269 #
270 #    Returns the copied pipeline template object.
271 #
272 def copy_pipeline_template(pt_uuid, src, dst, args):
273     # fetch the pipeline template from the source instance
274     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
275
276     if args.recursive:
277         check_git_availability()
278
279         if not args.dst_git_repo:
280             abort('--dst-git-repo is required when copying a pipeline recursively.')
281         # Copy input collections, docker images and git repos.
282         pt = copy_collections(pt, src, dst, args)
283         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
284         copy_docker_images(pt, src, dst, args)
285
286     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
287         pt_uuid,
288         pt['description'] if pt.get('description', None) else '')
289     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
290     del pt['uuid']
291
292     pt['owner_uuid'] = args.project_uuid
293
294     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
295
296 # copy_collections(obj, src, dst, args)
297 #
298 #    Recursively copies all collections referenced by 'obj' from src
299 #    to dst.  obj may be a dict or a list, in which case we run
300 #    copy_collections on every value it contains. If it is a string,
301 #    search it for any substring that matches a collection hash or uuid
302 #    (this will find hidden references to collections like
303 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
304 #
305 #    Returns a copy of obj with any old collection uuids replaced by
306 #    the new ones.
307 #
308 def copy_collections(obj, src, dst, args):
309
310     def copy_collection_fn(collection_match):
311         """Helper function for regex substitution: copies a single collection,
312         identified by the collection_match MatchObject, to the
313         destination.  Returns the destination collection uuid (or the
314         portable data hash if that's what src_id is).
315
316         """
317         src_id = collection_match.group(0)
318         if src_id not in collections_copied:
319             dst_col = copy_collection(src_id, src, dst, args)
320             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
321                 collections_copied[src_id] = src_id
322             else:
323                 collections_copied[src_id] = dst_col['uuid']
324         return collections_copied[src_id]
325
326     if isinstance(obj, basestring):
327         # Copy any collections identified in this string to dst, replacing
328         # them with the dst uuids as necessary.
329         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
330         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
331         return obj
332     elif isinstance(obj, dict):
333         return type(obj)((v, copy_collections(obj[v], src, dst, args))
334                          for v in obj)
335     elif isinstance(obj, list):
336         return type(obj)(copy_collections(v, src, dst, args) for v in obj)
337     return obj
338
339 def migrate_jobspec(jobspec, src, dst, dst_repo, args):
340     """Copy a job's script to the destination repository, and update its record.
341
342     Given a jobspec dictionary, this function finds the referenced script from
343     src and copies it to dst and dst_repo.  It also updates jobspec in place to
344     refer to names on the destination.
345     """
346     repo = jobspec.get('repository')
347     if repo is None:
348         return
349     # script_version is the "script_version" parameter from the source
350     # component or job.  If no script_version was supplied in the
351     # component or job, it is a mistake in the pipeline, but for the
352     # purposes of copying the repository, default to "master".
353     script_version = jobspec.get('script_version') or 'master'
354     script_key = (repo, script_version)
355     if script_key not in scripts_copied:
356         copy_git_repo(repo, src, dst, dst_repo, script_version, args)
357         scripts_copied.add(script_key)
358     jobspec['repository'] = dst_repo
359     repo_dir = local_repo_dir[repo]
360     for version_key in ['script_version', 'supplied_script_version']:
361         if version_key in jobspec:
362             jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
363
364 # copy_git_repos(p, src, dst, dst_repo, args)
365 #
366 #    Copies all git repositories referenced by pipeline instance or
367 #    template 'p' from src to dst.
368 #
369 #    For each component c in the pipeline:
370 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
371 #      * Rename script versions:
372 #          * c['script_version']
373 #          * c['job']['script_version']
374 #          * c['job']['supplied_script_version']
375 #        to the commit hashes they resolve to, since any symbolic
376 #        names (tags, branches) are not preserved in the destination repo.
377 #
378 #    The pipeline object is updated in place with the new repository
379 #    names.  The return value is undefined.
380 #
381 def copy_git_repos(p, src, dst, dst_repo, args):
382     for component in p['components'].itervalues():
383         migrate_jobspec(component, src, dst, dst_repo, args)
384         if 'job' in component:
385             migrate_jobspec(component['job'], src, dst, dst_repo, args)
386
387 def total_collection_size(manifest_text):
388     """Return the total number of bytes in this collection (excluding
389     duplicate blocks)."""
390
391     total_bytes = 0
392     locators_seen = {}
393     for line in manifest_text.splitlines():
394         words = line.split()
395         for word in words[1:]:
396             try:
397                 loc = arvados.KeepLocator(word)
398             except ValueError:
399                 continue  # this word isn't a locator, skip it
400             if loc.md5sum not in locators_seen:
401                 locators_seen[loc.md5sum] = True
402                 total_bytes += loc.size
403
404     return total_bytes
405
406 def create_collection_from(c, src, dst, args):
407     """Create a new collection record on dst, and copy Docker metadata if
408     available."""
409
410     collection_uuid = c['uuid']
411     del c['uuid']
412
413     if not c["name"]:
414         c['name'] = "copied from " + collection_uuid
415
416     if 'properties' in c:
417         del c['properties']
418
419     c['owner_uuid'] = args.project_uuid
420
421     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
422
423     # Create docker_image_repo+tag and docker_image_hash links
424     # at the destination.
425     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
426         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
427
428         for src_link in docker_links:
429             body = {key: src_link[key]
430                     for key in ['link_class', 'name', 'properties']}
431             body['head_uuid'] = dst_collection['uuid']
432             body['owner_uuid'] = args.project_uuid
433
434             lk = dst.links().create(body=body).execute(num_retries=args.retries)
435             logger.debug('created dst link {}'.format(lk))
436
437     return dst_collection
438
439 # copy_collection(obj_uuid, src, dst, args)
440 #
441 #    Copies the collection identified by obj_uuid from src to dst.
442 #    Returns the collection object created at dst.
443 #
444 #    If args.progress is True, produce a human-friendly progress
445 #    report.
446 #
447 #    If a collection with the desired portable_data_hash already
448 #    exists at dst, and args.force is False, copy_collection returns
449 #    the existing collection without copying any blocks.  Otherwise
450 #    (if no collection exists or if args.force is True)
451 #    copy_collection copies all of the collection data blocks from src
452 #    to dst.
453 #
454 #    For this application, it is critical to preserve the
455 #    collection's manifest hash, which is not guaranteed with the
456 #    arvados.CollectionReader and arvados.CollectionWriter classes.
457 #    Copying each block in the collection manually, followed by
458 #    the manifest block, ensures that the collection's manifest
459 #    hash will not change.
460 #
461 def copy_collection(obj_uuid, src, dst, args):
462     if arvados.util.keep_locator_pattern.match(obj_uuid):
463         # If the obj_uuid is a portable data hash, it might not be uniquely
464         # identified with a particular collection.  As a result, it is
465         # ambigious as to what name to use for the copy.  Apply some heuristics
466         # to pick which collection to get the name from.
467         srccol = src.collections().list(
468             filters=[['portable_data_hash', '=', obj_uuid]],
469             order="created_at asc"
470             ).execute(num_retries=args.retries)
471
472         items = srccol.get("items")
473
474         if not items:
475             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
476             return
477
478         c = None
479
480         if len(items) == 1:
481             # There's only one collection with the PDH, so use that.
482             c = items[0]
483         if not c:
484             # See if there is a collection that's in the same project
485             # as the root item (usually a pipeline) being copied.
486             for i in items:
487                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
488                     c = i
489                     break
490         if not c:
491             # Didn't find any collections located in the same project, so
492             # pick the oldest collection that has a name assigned to it.
493             for i in items:
494                 if i.get("name"):
495                     c = i
496                     break
497         if not c:
498             # None of the collections have names (?!), so just pick the
499             # first one.
500             c = items[0]
501
502         # list() doesn't return manifest text (and we don't want it to,
503         # because we don't need the same maninfest text sent to us 50
504         # times) so go and retrieve the collection object directly
505         # which will include the manifest text.
506         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
507     else:
508         # Assume this is an actual collection uuid, so fetch it directly.
509         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
510
511     # If a collection with this hash already exists at the
512     # destination, and 'force' is not true, just return that
513     # collection.
514     if not args.force:
515         if 'portable_data_hash' in c:
516             colhash = c['portable_data_hash']
517         else:
518             colhash = c['uuid']
519         dstcol = dst.collections().list(
520             filters=[['portable_data_hash', '=', colhash]]
521         ).execute(num_retries=args.retries)
522         if dstcol['items_available'] > 0:
523             for d in dstcol['items']:
524                 if ((args.project_uuid == d['owner_uuid']) and
525                     (c.get('name') == d['name']) and
526                     (c['portable_data_hash'] == d['portable_data_hash'])):
527                     return d
528             c['manifest_text'] = dst.collections().get(
529                 uuid=dstcol['items'][0]['uuid']
530             ).execute(num_retries=args.retries)['manifest_text']
531             return create_collection_from(c, src, dst, args)
532
533     # Fetch the collection's manifest.
534     manifest = c['manifest_text']
535     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
536
537     # Copy each block from src_keep to dst_keep.
538     # Use the newly signed locators returned from dst_keep to build
539     # a new manifest as we go.
540     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
541     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
542     dst_manifest = ""
543     dst_locators = {}
544     bytes_written = 0
545     bytes_expected = total_collection_size(manifest)
546     if args.progress:
547         progress_writer = ProgressWriter(human_progress)
548     else:
549         progress_writer = None
550
551     for line in manifest.splitlines():
552         words = line.split()
553         dst_manifest += words[0]
554         for word in words[1:]:
555             try:
556                 loc = arvados.KeepLocator(word)
557             except ValueError:
558                 # If 'word' can't be parsed as a locator,
559                 # presume it's a filename.
560                 dst_manifest += ' ' + word
561                 continue
562             blockhash = loc.md5sum
563             # copy this block if we haven't seen it before
564             # (otherwise, just reuse the existing dst_locator)
565             if blockhash not in dst_locators:
566                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
567                 if progress_writer:
568                     progress_writer.report(obj_uuid, bytes_written, bytes_expected)
569                 data = src_keep.get(word)
570                 dst_locator = dst_keep.put(data)
571                 dst_locators[blockhash] = dst_locator
572                 bytes_written += loc.size
573             dst_manifest += ' ' + dst_locators[blockhash]
574         dst_manifest += "\n"
575
576     if progress_writer:
577         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
578         progress_writer.finish()
579
580     # Copy the manifest and save the collection.
581     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
582
583     c['manifest_text'] = dst_manifest
584     return create_collection_from(c, src, dst, args)
585
586 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
587 #
588 #    Copies commits from git repository 'src_git_repo' on Arvados
589 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
590 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
591 #    or "jsmith")
592 #
593 #    All commits will be copied to a destination branch named for the
594 #    source repository URL.
595 #
596 #    The destination repository must already exist.
597 #
598 #    The user running this command must be authenticated
599 #    to both repositories.
600 #
601 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
602     # Identify the fetch and push URLs for the git repositories.
603     r = src.repositories().list(
604         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
605     if r['items_available'] != 1:
606         raise Exception('cannot identify source repo {}; {} repos found'
607                         .format(src_git_repo, r['items_available']))
608     src_git_url = r['items'][0]['fetch_url']
609     logger.debug('src_git_url: {}'.format(src_git_url))
610
611     r = dst.repositories().list(
612         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
613     if r['items_available'] != 1:
614         raise Exception('cannot identify destination repo {}; {} repos found'
615                         .format(dst_git_repo, r['items_available']))
616     dst_git_push_url  = r['items'][0]['push_url']
617     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
618
619     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
620
621     # Copy git commits from src repo to dst repo.
622     if src_git_repo not in local_repo_dir:
623         local_repo_dir[src_git_repo] = tempfile.mkdtemp()
624         arvados.util.run_command(
625             ["git", "clone", "--bare", src_git_url,
626              local_repo_dir[src_git_repo]],
627             cwd=os.path.dirname(local_repo_dir[src_git_repo]))
628         arvados.util.run_command(
629             ["git", "remote", "add", "dst", dst_git_push_url],
630             cwd=local_repo_dir[src_git_repo])
631     arvados.util.run_command(
632         ["git", "branch", dst_branch, script_version],
633         cwd=local_repo_dir[src_git_repo])
634     arvados.util.run_command(["git", "push", "dst", dst_branch],
635                              cwd=local_repo_dir[src_git_repo])
636
637 def copy_docker_images(pipeline, src, dst, args):
638     """Copy any docker images named in the pipeline components'
639     runtime_constraints field from src to dst."""
640
641     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
642     for c_name, c_info in pipeline['components'].iteritems():
643         if ('runtime_constraints' in c_info and
644             'docker_image' in c_info['runtime_constraints']):
645             copy_docker_image(
646                 c_info['runtime_constraints']['docker_image'],
647                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
648                 src, dst, args)
649
650
651 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
652     """Copy the docker image identified by docker_image and
653     docker_image_tag from src to dst. Create appropriate
654     docker_image_repo+tag and docker_image_hash links at dst.
655
656     """
657
658     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
659
660     # Find the link identifying this docker image.
661     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
662         src, args.retries, docker_image, docker_image_tag)
663     if docker_image_list:
664         image_uuid, image_info = docker_image_list[0]
665         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
666
667         # Copy the collection it refers to.
668         dst_image_col = copy_collection(image_uuid, src, dst, args)
669     elif arvados.util.keep_locator_pattern.match(docker_image):
670         dst_image_col = copy_collection(docker_image, src, dst, args)
671     else:
672         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
673
674 # git_rev_parse(rev, repo)
675 #
676 #    Returns the 40-character commit hash corresponding to 'rev' in
677 #    git repository 'repo' (which must be the path of a local git
678 #    repository)
679 #
680 def git_rev_parse(rev, repo):
681     gitout, giterr = arvados.util.run_command(
682         ['git', 'rev-parse', rev], cwd=repo)
683     return gitout.strip()
684
685 # uuid_type(api, object_uuid)
686 #
687 #    Returns the name of the class that object_uuid belongs to, based on
688 #    the second field of the uuid.  This function consults the api's
689 #    schema to identify the object class.
690 #
691 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
692 #
693 #    Special case: if handed a Keep locator hash, return 'Collection'.
694 #
695 def uuid_type(api, object_uuid):
696     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
697         return 'Collection'
698     p = object_uuid.split('-')
699     if len(p) == 3:
700         type_prefix = p[1]
701         for k in api._schema.schemas:
702             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
703             if type_prefix == obj_class:
704                 return k
705     return None
706
707 def abort(msg, code=1):
708     logger.info("arv-copy: %s", msg)
709     exit(code)
710
711
712 # Code for reporting on the progress of a collection upload.
713 # Stolen from arvados.commands.put.ArvPutCollectionWriter
714 # TODO(twp): figure out how to refactor into a shared library
715 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
716 # code)
717
718 def machine_progress(obj_uuid, bytes_written, bytes_expected):
719     return "{} {}: {} {} written {} total\n".format(
720         sys.argv[0],
721         os.getpid(),
722         obj_uuid,
723         bytes_written,
724         -1 if (bytes_expected is None) else bytes_expected)
725
726 def human_progress(obj_uuid, bytes_written, bytes_expected):
727     if bytes_expected:
728         return "\r{}: {}M / {}M {:.1%} ".format(
729             obj_uuid,
730             bytes_written >> 20, bytes_expected >> 20,
731             float(bytes_written) / bytes_expected)
732     else:
733         return "\r{}: {} ".format(obj_uuid, bytes_written)
734
735 class ProgressWriter(object):
736     _progress_func = None
737     outfile = sys.stderr
738
739     def __init__(self, progress_func):
740         self._progress_func = progress_func
741
742     def report(self, obj_uuid, bytes_written, bytes_expected):
743         if self._progress_func is not None:
744             self.outfile.write(
745                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
746
747     def finish(self):
748         self.outfile.write("\n")
749
750 if __name__ == '__main__':
751     main()