Merge branch '4956-limit-request-size' refs #4956
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 # The owner_uuid of the object being copied
50 src_owner_uuid = None
51
52 def main():
53     copy_opts = argparse.ArgumentParser(add_help=False)
54
55     copy_opts.add_argument(
56         '-v', '--verbose', dest='verbose', action='store_true',
57         help='Verbose output.')
58     copy_opts.add_argument(
59         '--progress', dest='progress', action='store_true',
60         help='Report progress on copying collections. (default)')
61     copy_opts.add_argument(
62         '--no-progress', dest='progress', action='store_false',
63         help='Do not report progress on copying collections.')
64     copy_opts.add_argument(
65         '-f', '--force', dest='force', action='store_true',
66         help='Perform copy even if the object appears to exist at the remote destination.')
67     copy_opts.add_argument(
68         '--src', dest='source_arvados', required=True,
69         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
70     copy_opts.add_argument(
71         '--dst', dest='destination_arvados', required=True,
72         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
73     copy_opts.add_argument(
74         '--recursive', dest='recursive', action='store_true',
75         help='Recursively copy any dependencies for this object. (default)')
76     copy_opts.add_argument(
77         '--no-recursive', dest='recursive', action='store_false',
78         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
79     copy_opts.add_argument(
80         '--dst-git-repo', dest='dst_git_repo',
81         help='The name of the destination git repository. Required when copying a pipeline recursively.')
82     copy_opts.add_argument(
83         '--project-uuid', dest='project_uuid',
84         help='The UUID of the project at the destination to which the pipeline should be copied.')
85     copy_opts.add_argument(
86         'object_uuid',
87         help='The UUID of the object to be copied.')
88     copy_opts.set_defaults(progress=True)
89     copy_opts.set_defaults(recursive=True)
90
91     parser = argparse.ArgumentParser(
92         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
93         parents=[copy_opts, arv_cmd.retry_opt])
94     args = parser.parse_args()
95
96     if args.verbose:
97         logger.setLevel(logging.DEBUG)
98     else:
99         logger.setLevel(logging.INFO)
100
101     # Create API clients for the source and destination instances
102     src_arv = api_for_instance(args.source_arvados)
103     dst_arv = api_for_instance(args.destination_arvados)
104
105     if not args.project_uuid:
106         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
107
108     # Identify the kind of object we have been given, and begin copying.
109     t = uuid_type(src_arv, args.object_uuid)
110     if t == 'Collection':
111         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
112         result = copy_collection(args.object_uuid,
113                                  src_arv, dst_arv,
114                                  args)
115     elif t == 'PipelineInstance':
116         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
117         result = copy_pipeline_instance(args.object_uuid,
118                                         src_arv, dst_arv,
119                                         args)
120     elif t == 'PipelineTemplate':
121         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
122         result = copy_pipeline_template(args.object_uuid,
123                                         src_arv, dst_arv, args)
124     else:
125         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
126
127     # Clean up any outstanding temp git repositories.
128     for d in local_repo_dir.values():
129         shutil.rmtree(d, ignore_errors=True)
130
131     # If no exception was thrown and the response does not have an
132     # error_token field, presume success
133     if 'error_token' in result or 'uuid' not in result:
134         logger.error("API server returned an error result: {}".format(result))
135         exit(1)
136
137     logger.info("")
138     logger.info("Success: created copy with uuid {}".format(result['uuid']))
139     exit(0)
140
141 def set_src_owner_uuid(resource, uuid, args):
142     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
143     src_owner_uuid = c.get("owner_uuid")
144
145 # api_for_instance(instance_name)
146 #
147 #     Creates an API client for the Arvados instance identified by
148 #     instance_name.
149 #
150 #     If instance_name contains a slash, it is presumed to be a path
151 #     (either local or absolute) to a file with Arvados configuration
152 #     settings.
153 #
154 #     Otherwise, it is presumed to be the name of a file in
155 #     $HOME/.config/arvados/instance_name.conf
156 #
157 def api_for_instance(instance_name):
158     if '/' in instance_name:
159         config_file = instance_name
160     else:
161         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
162
163     try:
164         cfg = arvados.config.load(config_file)
165     except (IOError, OSError) as e:
166         abort(("Could not open config file {}: {}\n" +
167                "You must make sure that your configuration tokens\n" +
168                "for Arvados instance {} are in {} and that this\n" +
169                "file is readable.").format(
170                    config_file, e, instance_name, config_file))
171
172     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
173         api_is_insecure = (
174             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
175                 ['1', 't', 'true', 'y', 'yes']))
176         client = arvados.api('v1',
177                              host=cfg['ARVADOS_API_HOST'],
178                              token=cfg['ARVADOS_API_TOKEN'],
179                              insecure=api_is_insecure)
180     else:
181         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
182     return client
183
184 # copy_pipeline_instance(pi_uuid, src, dst, args)
185 #
186 #    Copies a pipeline instance identified by pi_uuid from src to dst.
187 #
188 #    If the args.recursive option is set:
189 #      1. Copies all input collections
190 #           * For each component in the pipeline, include all collections
191 #             listed as job dependencies for that component)
192 #      2. Copy docker images
193 #      3. Copy git repositories
194 #      4. Copy the pipeline template
195 #
196 #    The only changes made to the copied pipeline instance are:
197 #      1. The original pipeline instance UUID is preserved in
198 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
199 #      2. The pipeline_template_uuid is changed to the new template uuid.
200 #      3. The owner_uuid of the instance is changed to the user who
201 #         copied it.
202 #
203 def copy_pipeline_instance(pi_uuid, src, dst, args):
204     # Fetch the pipeline instance record.
205     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
206
207     if args.recursive:
208         if not args.dst_git_repo:
209             abort('--dst-git-repo is required when copying a pipeline recursively.')
210         # Copy the pipeline template and save the copied template.
211         if pi.get('pipeline_template_uuid', None):
212             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
213                                         src, dst, args)
214
215         # Copy input collections, docker images and git repos.
216         pi = copy_collections(pi, src, dst, args)
217         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
218         copy_docker_images(pi, src, dst, args)
219
220         # Update the fields of the pipeline instance with the copied
221         # pipeline template.
222         if pi.get('pipeline_template_uuid', None):
223             pi['pipeline_template_uuid'] = pt['uuid']
224
225     else:
226         # not recursive
227         logger.info("Copying only pipeline instance %s.", pi_uuid)
228         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
229
230     # Update the pipeline instance properties, and create the new
231     # instance at dst.
232     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
233     pi['description'] = "Pipeline copied from {}\n\n{}".format(
234         pi_uuid,
235         pi['description'] if pi.get('description', None) else '')
236
237     pi['owner_uuid'] = args.project_uuid
238
239     del pi['uuid']
240
241     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
242     return new_pi
243
244 # copy_pipeline_template(pt_uuid, src, dst, args)
245 #
246 #    Copies a pipeline template identified by pt_uuid from src to dst.
247 #
248 #    If args.recursive is True, also copy any collections, docker
249 #    images and git repositories that this template references.
250 #
251 #    The owner_uuid of the new template is changed to that of the user
252 #    who copied the template.
253 #
254 #    Returns the copied pipeline template object.
255 #
256 def copy_pipeline_template(pt_uuid, src, dst, args):
257     # fetch the pipeline template from the source instance
258     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
259
260     if args.recursive:
261         if not args.dst_git_repo:
262             abort('--dst-git-repo is required when copying a pipeline recursively.')
263         # Copy input collections, docker images and git repos.
264         pt = copy_collections(pt, src, dst, args)
265         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
266         copy_docker_images(pt, src, dst, args)
267
268     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
269         pt_uuid,
270         pt['description'] if pt.get('description', None) else '')
271     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
272     del pt['uuid']
273
274     pt['owner_uuid'] = args.project_uuid
275
276     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
277
278 # copy_collections(obj, src, dst, args)
279 #
280 #    Recursively copies all collections referenced by 'obj' from src
281 #    to dst.  obj may be a dict or a list, in which case we run
282 #    copy_collections on every value it contains. If it is a string,
283 #    search it for any substring that matches a collection hash or uuid
284 #    (this will find hidden references to collections like
285 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
286 #
287 #    Returns a copy of obj with any old collection uuids replaced by
288 #    the new ones.
289 #
290 def copy_collections(obj, src, dst, args):
291
292     def copy_collection_fn(collection_match):
293         """Helper function for regex substitution: copies a single collection,
294         identified by the collection_match MatchObject, to the
295         destination.  Returns the destination collection uuid (or the
296         portable data hash if that's what src_id is).
297
298         """
299         src_id = collection_match.group(0)
300         if src_id not in collections_copied:
301             dst_col = copy_collection(src_id, src, dst, args)
302             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
303                 collections_copied[src_id] = src_id
304             else:
305                 collections_copied[src_id] = dst_col['uuid']
306         return collections_copied[src_id]
307
308     if isinstance(obj, basestring):
309         # Copy any collections identified in this string to dst, replacing
310         # them with the dst uuids as necessary.
311         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
312         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
313         return obj
314     elif type(obj) == dict:
315         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
316     elif type(obj) == list:
317         return [copy_collections(v, src, dst, args) for v in obj]
318     return obj
319
320 # copy_git_repos(p, src, dst, dst_repo, args)
321 #
322 #    Copies all git repositories referenced by pipeline instance or
323 #    template 'p' from src to dst.
324 #
325 #    For each component c in the pipeline:
326 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
327 #      * Rename script versions:
328 #          * c['script_version']
329 #          * c['job']['script_version']
330 #          * c['job']['supplied_script_version']
331 #        to the commit hashes they resolve to, since any symbolic
332 #        names (tags, branches) are not preserved in the destination repo.
333 #
334 #    The pipeline object is updated in place with the new repository
335 #    names.  The return value is undefined.
336 #
337 def copy_git_repos(p, src, dst, dst_repo, args):
338     copied = set()
339     for c in p['components']:
340         component = p['components'][c]
341         if 'repository' in component:
342             repo = component['repository']
343             script_version = component.get('script_version', None)
344             if repo not in copied:
345                 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
346                 copied.add(repo)
347             component['repository'] = dst_repo
348             if script_version:
349                 repo_dir = local_repo_dir[repo]
350                 component['script_version'] = git_rev_parse(script_version, repo_dir)
351         if 'job' in component:
352             j = component['job']
353             if 'repository' in j:
354                 repo = j['repository']
355                 script_version = j.get('script_version', None)
356                 if repo not in copied:
357                     copy_git_repo(repo, src, dst, dst_repo, script_version, args)
358                     copied.add(repo)
359                 j['repository'] = dst_repo
360                 repo_dir = local_repo_dir[repo]
361                 if script_version:
362                     j['script_version'] = git_rev_parse(script_version, repo_dir)
363                 if 'supplied_script_version' in j:
364                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
365
366 def total_collection_size(manifest_text):
367     """Return the total number of bytes in this collection (excluding
368     duplicate blocks)."""
369
370     total_bytes = 0
371     locators_seen = {}
372     for line in manifest_text.splitlines():
373         words = line.split()
374         for word in words[1:]:
375             try:
376                 loc = arvados.KeepLocator(word)
377             except ValueError:
378                 continue  # this word isn't a locator, skip it
379             if loc.md5sum not in locators_seen:
380                 locators_seen[loc.md5sum] = True
381                 total_bytes += loc.size
382
383     return total_bytes
384
385 def create_collection_from(c, src, dst, args):
386     """Create a new collection record on dst, and copy Docker metadata if
387     available."""
388
389     collection_uuid = c['uuid']
390     del c['uuid']
391
392     if not c["name"]:
393         c['name'] = "copied from " + collection_uuid
394
395     if 'properties' in c:
396         del c['properties']
397
398     c['owner_uuid'] = args.project_uuid
399
400     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
401
402     # Create docker_image_repo+tag and docker_image_hash links
403     # at the destination.
404     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
405         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
406
407         for d in docker_links:
408             body={
409                 'head_uuid': dst_collection['uuid'],
410                 'link_class': link_class,
411                 'name': d['name'],
412             }
413             body['owner_uuid'] = args.project_uuid
414
415             lk = dst.links().create(body=body).execute(num_retries=args.retries)
416             logger.debug('created dst link {}'.format(lk))
417
418     return dst_collection
419
420 # copy_collection(obj_uuid, src, dst, args)
421 #
422 #    Copies the collection identified by obj_uuid from src to dst.
423 #    Returns the collection object created at dst.
424 #
425 #    If args.progress is True, produce a human-friendly progress
426 #    report.
427 #
428 #    If a collection with the desired portable_data_hash already
429 #    exists at dst, and args.force is False, copy_collection returns
430 #    the existing collection without copying any blocks.  Otherwise
431 #    (if no collection exists or if args.force is True)
432 #    copy_collection copies all of the collection data blocks from src
433 #    to dst.
434 #
435 #    For this application, it is critical to preserve the
436 #    collection's manifest hash, which is not guaranteed with the
437 #    arvados.CollectionReader and arvados.CollectionWriter classes.
438 #    Copying each block in the collection manually, followed by
439 #    the manifest block, ensures that the collection's manifest
440 #    hash will not change.
441 #
442 def copy_collection(obj_uuid, src, dst, args):
443     if arvados.util.keep_locator_pattern.match(obj_uuid):
444         # If the obj_uuid is a portable data hash, it might not be uniquely
445         # identified with a particular collection.  As a result, it is
446         # ambigious as to what name to use for the copy.  Apply some heuristics
447         # to pick which collection to get the name from.
448         srccol = src.collections().list(
449             filters=[['portable_data_hash', '=', obj_uuid]],
450             order="created_at asc"
451             ).execute(num_retries=args.retries)
452
453         items = srccol.get("items")
454
455         if not items:
456             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
457             return
458
459         c = None
460
461         if len(items) == 1:
462             # There's only one collection with the PDH, so use that.
463             c = items[0]
464         if not c:
465             # See if there is a collection that's in the same project
466             # as the root item (usually a pipeline) being copied.
467             for i in items:
468                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
469                     c = i
470                     break
471         if not c:
472             # Didn't find any collections located in the same project, so
473             # pick the oldest collection that has a name assigned to it.
474             for i in items:
475                 if i.get("name"):
476                     c = i
477                     break
478         if not c:
479             # None of the collections have names (?!), so just pick the
480             # first one.
481             c = items[0]
482
483         # list() doesn't return manifest text (and we don't want it to,
484         # because we don't need the same maninfest text sent to us 50
485         # times) so go and retrieve the collection object directly
486         # which will include the manifest text.
487         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
488     else:
489         # Assume this is an actual collection uuid, so fetch it directly.
490         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
491
492     # If a collection with this hash already exists at the
493     # destination, and 'force' is not true, just return that
494     # collection.
495     if not args.force:
496         if 'portable_data_hash' in c:
497             colhash = c['portable_data_hash']
498         else:
499             colhash = c['uuid']
500         dstcol = dst.collections().list(
501             filters=[['portable_data_hash', '=', colhash]]
502         ).execute(num_retries=args.retries)
503         if dstcol['items_available'] > 0:
504             for d in dstcol['items']:
505                 if ((args.project_uuid == d['owner_uuid']) and
506                     (c.get('name') == d['name']) and
507                     (c['portable_data_hash'] == d['portable_data_hash'])):
508                     return d
509
510             return create_collection_from(c, src, dst, args)
511
512     # Fetch the collection's manifest.
513     manifest = c['manifest_text']
514     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
515
516     # Copy each block from src_keep to dst_keep.
517     # Use the newly signed locators returned from dst_keep to build
518     # a new manifest as we go.
519     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
520     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
521     dst_manifest = ""
522     dst_locators = {}
523     bytes_written = 0
524     bytes_expected = total_collection_size(manifest)
525     if args.progress:
526         progress_writer = ProgressWriter(human_progress)
527     else:
528         progress_writer = None
529
530     for line in manifest.splitlines(True):
531         words = line.split()
532         dst_manifest_line = words[0]
533         for word in words[1:]:
534             try:
535                 loc = arvados.KeepLocator(word)
536                 blockhash = loc.md5sum
537                 # copy this block if we haven't seen it before
538                 # (otherwise, just reuse the existing dst_locator)
539                 if blockhash not in dst_locators:
540                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
541                     if progress_writer:
542                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
543                     data = src_keep.get(word)
544                     dst_locator = dst_keep.put(data)
545                     dst_locators[blockhash] = dst_locator
546                     bytes_written += loc.size
547                 dst_manifest_line += ' ' + dst_locators[blockhash]
548             except ValueError:
549                 # If 'word' can't be parsed as a locator,
550                 # presume it's a filename.
551                 dst_manifest_line += ' ' + word
552         dst_manifest += dst_manifest_line
553         if line.endswith("\n"):
554             dst_manifest += "\n"
555
556     if progress_writer:
557         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
558         progress_writer.finish()
559
560     # Copy the manifest and save the collection.
561     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
562
563     dst_keep.put(dst_manifest.encode('utf-8'))
564     c['manifest_text'] = dst_manifest
565     return create_collection_from(c, src, dst, args)
566
567 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
568 #
569 #    Copies commits from git repository 'src_git_repo' on Arvados
570 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
571 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
572 #    or "jsmith")
573 #
574 #    All commits will be copied to a destination branch named for the
575 #    source repository URL.
576 #
577 #    Because users cannot create their own repositories, the
578 #    destination repository must already exist.
579 #
580 #    The user running this command must be authenticated
581 #    to both repositories.
582 #
583 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
584     # Identify the fetch and push URLs for the git repositories.
585     r = src.repositories().list(
586         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
587     if r['items_available'] != 1:
588         raise Exception('cannot identify source repo {}; {} repos found'
589                         .format(src_git_repo, r['items_available']))
590     src_git_url = r['items'][0]['fetch_url']
591     logger.debug('src_git_url: {}'.format(src_git_url))
592
593     r = dst.repositories().list(
594         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
595     if r['items_available'] != 1:
596         raise Exception('cannot identify destination repo {}; {} repos found'
597                         .format(dst_git_repo, r['items_available']))
598     dst_git_push_url  = r['items'][0]['push_url']
599     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
600
601     # script_version is the "script_version" parameter from the source
602     # component or job.  It is used here to tie the destination branch
603     # to the commit that was used on the source.  If no script_version
604     # was supplied in the component or job, it is a mistake in the pipeline,
605     # but for the purposes of copying the repository, default to "master".
606     #
607     if not script_version:
608         script_version = "master"
609
610     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
611
612     # Copy git commits from src repo to dst repo (but only if
613     # we have not already copied this repo in this session).
614     #
615     if src_git_repo in local_repo_dir:
616         logger.debug('already copied src repo %s, skipping', src_git_repo)
617     else:
618         tmprepo = tempfile.mkdtemp()
619         local_repo_dir[src_git_repo] = tmprepo
620         arvados.util.run_command(
621             ["git", "clone", "--bare", src_git_url, tmprepo],
622             cwd=os.path.dirname(tmprepo))
623         arvados.util.run_command(
624             ["git", "branch", dst_branch, script_version],
625             cwd=tmprepo)
626         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
627         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
628
629
630 def copy_docker_images(pipeline, src, dst, args):
631     """Copy any docker images named in the pipeline components'
632     runtime_constraints field from src to dst."""
633
634     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
635     for c_name, c_info in pipeline['components'].iteritems():
636         if ('runtime_constraints' in c_info and
637             'docker_image' in c_info['runtime_constraints']):
638             copy_docker_image(
639                 c_info['runtime_constraints']['docker_image'],
640                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
641                 src, dst, args)
642
643
644 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
645     """Copy the docker image identified by docker_image and
646     docker_image_tag from src to dst. Create appropriate
647     docker_image_repo+tag and docker_image_hash links at dst.
648
649     """
650
651     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
652
653     # Find the link identifying this docker image.
654     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
655         src, args.retries, docker_image, docker_image_tag)
656     if docker_image_list:
657         image_uuid, image_info = docker_image_list[0]
658         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
659
660         # Copy the collection it refers to.
661         dst_image_col = copy_collection(image_uuid, src, dst, args)
662     elif arvados.util.keep_locator_pattern.match(docker_image):
663         dst_image_col = copy_collection(docker_image, src, dst, args)
664     else:
665         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
666
667 # git_rev_parse(rev, repo)
668 #
669 #    Returns the 40-character commit hash corresponding to 'rev' in
670 #    git repository 'repo' (which must be the path of a local git
671 #    repository)
672 #
673 def git_rev_parse(rev, repo):
674     gitout, giterr = arvados.util.run_command(
675         ['git', 'rev-parse', rev], cwd=repo)
676     return gitout.strip()
677
678 # uuid_type(api, object_uuid)
679 #
680 #    Returns the name of the class that object_uuid belongs to, based on
681 #    the second field of the uuid.  This function consults the api's
682 #    schema to identify the object class.
683 #
684 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
685 #
686 #    Special case: if handed a Keep locator hash, return 'Collection'.
687 #
688 def uuid_type(api, object_uuid):
689     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
690         return 'Collection'
691     p = object_uuid.split('-')
692     if len(p) == 3:
693         type_prefix = p[1]
694         for k in api._schema.schemas:
695             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
696             if type_prefix == obj_class:
697                 return k
698     return None
699
700 def abort(msg, code=1):
701     logger.info("arv-copy: %s", msg)
702     exit(code)
703
704
705 # Code for reporting on the progress of a collection upload.
706 # Stolen from arvados.commands.put.ArvPutCollectionWriter
707 # TODO(twp): figure out how to refactor into a shared library
708 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
709 # code)
710
711 def machine_progress(obj_uuid, bytes_written, bytes_expected):
712     return "{} {}: {} {} written {} total\n".format(
713         sys.argv[0],
714         os.getpid(),
715         obj_uuid,
716         bytes_written,
717         -1 if (bytes_expected is None) else bytes_expected)
718
719 def human_progress(obj_uuid, bytes_written, bytes_expected):
720     if bytes_expected:
721         return "\r{}: {}M / {}M {:.1%} ".format(
722             obj_uuid,
723             bytes_written >> 20, bytes_expected >> 20,
724             float(bytes_written) / bytes_expected)
725     else:
726         return "\r{}: {} ".format(obj_uuid, bytes_written)
727
728 class ProgressWriter(object):
729     _progress_func = None
730     outfile = sys.stderr
731
732     def __init__(self, progress_func):
733         self._progress_func = progress_func
734
735     def report(self, obj_uuid, bytes_written, bytes_expected):
736         if self._progress_func is not None:
737             self.outfile.write(
738                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
739
740     def finish(self):
741         self.outfile.write("\n")
742
743 if __name__ == '__main__':
744     main()