5627: Python file-like objects use SEET_SET as the default whence.
[arvados.git] / sdk / python / arvados / commands / arv_copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33 import arvados.commands.keepdocker
34
35 logger = logging.getLogger('arvados.arv-copy')
36
37 # local_repo_dir records which git repositories from the Arvados source
38 # instance have been checked out locally during this run, and to which
39 # directories.
40 # e.g. if repository 'twp' from src_arv has been cloned into
41 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
42 #
43 local_repo_dir = {}
44
45 # List of collections that have been copied in this session, and their
46 # destination collection UUIDs.
47 collections_copied = {}
48
49 # The owner_uuid of the object being copied
50 src_owner_uuid = None
51
52 def main():
53     copy_opts = argparse.ArgumentParser(add_help=False)
54
55     copy_opts.add_argument(
56         '-v', '--verbose', dest='verbose', action='store_true',
57         help='Verbose output.')
58     copy_opts.add_argument(
59         '--progress', dest='progress', action='store_true',
60         help='Report progress on copying collections. (default)')
61     copy_opts.add_argument(
62         '--no-progress', dest='progress', action='store_false',
63         help='Do not report progress on copying collections.')
64     copy_opts.add_argument(
65         '-f', '--force', dest='force', action='store_true',
66         help='Perform copy even if the object appears to exist at the remote destination.')
67     copy_opts.add_argument(
68         '--src', dest='source_arvados', required=True,
69         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
70     copy_opts.add_argument(
71         '--dst', dest='destination_arvados', required=True,
72         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
73     copy_opts.add_argument(
74         '--recursive', dest='recursive', action='store_true',
75         help='Recursively copy any dependencies for this object. (default)')
76     copy_opts.add_argument(
77         '--no-recursive', dest='recursive', action='store_false',
78         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
79     copy_opts.add_argument(
80         '--dst-git-repo', dest='dst_git_repo',
81         help='The name of the destination git repository. Required when copying a pipeline recursively.')
82     copy_opts.add_argument(
83         '--project-uuid', dest='project_uuid',
84         help='The UUID of the project at the destination to which the pipeline should be copied.')
85     copy_opts.add_argument(
86         'object_uuid',
87         help='The UUID of the object to be copied.')
88     copy_opts.set_defaults(progress=True)
89     copy_opts.set_defaults(recursive=True)
90
91     parser = argparse.ArgumentParser(
92         description='Copy a pipeline instance, template or collection from one Arvados instance to another.',
93         parents=[copy_opts, arv_cmd.retry_opt])
94     args = parser.parse_args()
95
96     if args.verbose:
97         logger.setLevel(logging.DEBUG)
98     else:
99         logger.setLevel(logging.INFO)
100
101     # Create API clients for the source and destination instances
102     src_arv = api_for_instance(args.source_arvados)
103     dst_arv = api_for_instance(args.destination_arvados)
104
105     if not args.project_uuid:
106         args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
107
108     # Identify the kind of object we have been given, and begin copying.
109     t = uuid_type(src_arv, args.object_uuid)
110     if t == 'Collection':
111         set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
112         result = copy_collection(args.object_uuid,
113                                  src_arv, dst_arv,
114                                  args)
115     elif t == 'PipelineInstance':
116         set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
117         result = copy_pipeline_instance(args.object_uuid,
118                                         src_arv, dst_arv,
119                                         args)
120     elif t == 'PipelineTemplate':
121         set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
122         result = copy_pipeline_template(args.object_uuid,
123                                         src_arv, dst_arv, args)
124     else:
125         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
126
127     # Clean up any outstanding temp git repositories.
128     for d in local_repo_dir.values():
129         shutil.rmtree(d, ignore_errors=True)
130
131     # If no exception was thrown and the response does not have an
132     # error_token field, presume success
133     if 'error_token' in result or 'uuid' not in result:
134         logger.error("API server returned an error result: {}".format(result))
135         exit(1)
136
137     logger.info("")
138     logger.info("Success: created copy with uuid {}".format(result['uuid']))
139     exit(0)
140
141 def set_src_owner_uuid(resource, uuid, args):
142     c = resource.get(uuid=uuid).execute(num_retries=args.retries)
143     src_owner_uuid = c.get("owner_uuid")
144
145 # api_for_instance(instance_name)
146 #
147 #     Creates an API client for the Arvados instance identified by
148 #     instance_name.
149 #
150 #     If instance_name contains a slash, it is presumed to be a path
151 #     (either local or absolute) to a file with Arvados configuration
152 #     settings.
153 #
154 #     Otherwise, it is presumed to be the name of a file in
155 #     $HOME/.config/arvados/instance_name.conf
156 #
157 def api_for_instance(instance_name):
158     if '/' in instance_name:
159         config_file = instance_name
160     else:
161         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
162
163     try:
164         cfg = arvados.config.load(config_file)
165     except (IOError, OSError) as e:
166         abort(("Could not open config file {}: {}\n" +
167                "You must make sure that your configuration tokens\n" +
168                "for Arvados instance {} are in {} and that this\n" +
169                "file is readable.").format(
170                    config_file, e, instance_name, config_file))
171
172     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
173         api_is_insecure = (
174             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
175                 ['1', 't', 'true', 'y', 'yes']))
176         client = arvados.api('v1',
177                              host=cfg['ARVADOS_API_HOST'],
178                              token=cfg['ARVADOS_API_TOKEN'],
179                              insecure=api_is_insecure)
180     else:
181         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
182     return client
183
184 # copy_pipeline_instance(pi_uuid, src, dst, args)
185 #
186 #    Copies a pipeline instance identified by pi_uuid from src to dst.
187 #
188 #    If the args.recursive option is set:
189 #      1. Copies all input collections
190 #           * For each component in the pipeline, include all collections
191 #             listed as job dependencies for that component)
192 #      2. Copy docker images
193 #      3. Copy git repositories
194 #      4. Copy the pipeline template
195 #
196 #    The only changes made to the copied pipeline instance are:
197 #      1. The original pipeline instance UUID is preserved in
198 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
199 #      2. The pipeline_template_uuid is changed to the new template uuid.
200 #      3. The owner_uuid of the instance is changed to the user who
201 #         copied it.
202 #
203 def copy_pipeline_instance(pi_uuid, src, dst, args):
204     # Fetch the pipeline instance record.
205     pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
206
207     if args.recursive:
208         if not args.dst_git_repo:
209             abort('--dst-git-repo is required when copying a pipeline recursively.')
210         # Copy the pipeline template and save the copied template.
211         if pi.get('pipeline_template_uuid', None):
212             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
213                                         src, dst, args)
214
215         # Copy input collections, docker images and git repos.
216         pi = copy_collections(pi, src, dst, args)
217         copy_git_repos(pi, src, dst, args.dst_git_repo, args)
218         copy_docker_images(pi, src, dst, args)
219
220         # Update the fields of the pipeline instance with the copied
221         # pipeline template.
222         if pi.get('pipeline_template_uuid', None):
223             pi['pipeline_template_uuid'] = pt['uuid']
224
225     else:
226         # not recursive
227         logger.info("Copying only pipeline instance %s.", pi_uuid)
228         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
229
230     # Update the pipeline instance properties, and create the new
231     # instance at dst.
232     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
233     pi['description'] = "Pipeline copied from {}\n\n{}".format(
234         pi_uuid,
235         pi['description'] if pi.get('description', None) else '')
236
237     pi['owner_uuid'] = args.project_uuid
238
239     del pi['uuid']
240
241     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
242     return new_pi
243
244 # copy_pipeline_template(pt_uuid, src, dst, args)
245 #
246 #    Copies a pipeline template identified by pt_uuid from src to dst.
247 #
248 #    If args.recursive is True, also copy any collections, docker
249 #    images and git repositories that this template references.
250 #
251 #    The owner_uuid of the new template is changed to that of the user
252 #    who copied the template.
253 #
254 #    Returns the copied pipeline template object.
255 #
256 def copy_pipeline_template(pt_uuid, src, dst, args):
257     # fetch the pipeline template from the source instance
258     pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
259
260     if args.recursive:
261         if not args.dst_git_repo:
262             abort('--dst-git-repo is required when copying a pipeline recursively.')
263         # Copy input collections, docker images and git repos.
264         pt = copy_collections(pt, src, dst, args)
265         copy_git_repos(pt, src, dst, args.dst_git_repo, args)
266         copy_docker_images(pt, src, dst, args)
267
268     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
269         pt_uuid,
270         pt['description'] if pt.get('description', None) else '')
271     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
272     del pt['uuid']
273
274     pt['owner_uuid'] = args.project_uuid
275
276     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
277
278 # copy_collections(obj, src, dst, args)
279 #
280 #    Recursively copies all collections referenced by 'obj' from src
281 #    to dst.  obj may be a dict or a list, in which case we run
282 #    copy_collections on every value it contains. If it is a string,
283 #    search it for any substring that matches a collection hash or uuid
284 #    (this will find hidden references to collections like
285 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
286 #
287 #    Returns a copy of obj with any old collection uuids replaced by
288 #    the new ones.
289 #
290 def copy_collections(obj, src, dst, args):
291
292     def copy_collection_fn(collection_match):
293         """Helper function for regex substitution: copies a single collection,
294         identified by the collection_match MatchObject, to the
295         destination.  Returns the destination collection uuid (or the
296         portable data hash if that's what src_id is).
297
298         """
299         src_id = collection_match.group(0)
300         if src_id not in collections_copied:
301             dst_col = copy_collection(src_id, src, dst, args)
302             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
303                 collections_copied[src_id] = src_id
304             else:
305                 collections_copied[src_id] = dst_col['uuid']
306         return collections_copied[src_id]
307
308     if isinstance(obj, basestring):
309         # Copy any collections identified in this string to dst, replacing
310         # them with the dst uuids as necessary.
311         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
312         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
313         return obj
314     elif type(obj) == dict:
315         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
316     elif type(obj) == list:
317         return [copy_collections(v, src, dst, args) for v in obj]
318     return obj
319
320 # copy_git_repos(p, src, dst, dst_repo, args)
321 #
322 #    Copies all git repositories referenced by pipeline instance or
323 #    template 'p' from src to dst.
324 #
325 #    For each component c in the pipeline:
326 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
327 #      * Rename script versions:
328 #          * c['script_version']
329 #          * c['job']['script_version']
330 #          * c['job']['supplied_script_version']
331 #        to the commit hashes they resolve to, since any symbolic
332 #        names (tags, branches) are not preserved in the destination repo.
333 #
334 #    The pipeline object is updated in place with the new repository
335 #    names.  The return value is undefined.
336 #
337 def copy_git_repos(p, src, dst, dst_repo, args):
338     copied = set()
339     for c in p['components']:
340         component = p['components'][c]
341         if 'repository' in component:
342             repo = component['repository']
343             script_version = component.get('script_version', None)
344             if repo not in copied:
345                 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
346                 copied.add(repo)
347             component['repository'] = dst_repo
348             if script_version:
349                 repo_dir = local_repo_dir[repo]
350                 component['script_version'] = git_rev_parse(script_version, repo_dir)
351         if 'job' in component:
352             j = component['job']
353             if 'repository' in j:
354                 repo = j['repository']
355                 script_version = j.get('script_version', None)
356                 if repo not in copied:
357                     copy_git_repo(repo, src, dst, dst_repo, script_version, args)
358                     copied.add(repo)
359                 j['repository'] = dst_repo
360                 repo_dir = local_repo_dir[repo]
361                 if script_version:
362                     j['script_version'] = git_rev_parse(script_version, repo_dir)
363                 if 'supplied_script_version' in j:
364                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
365
366 def total_collection_size(manifest_text):
367     """Return the total number of bytes in this collection (excluding
368     duplicate blocks)."""
369
370     total_bytes = 0
371     locators_seen = {}
372     for line in manifest_text.splitlines():
373         words = line.split()
374         for word in words[1:]:
375             try:
376                 loc = arvados.KeepLocator(word)
377             except ValueError:
378                 continue  # this word isn't a locator, skip it
379             if loc.md5sum not in locators_seen:
380                 locators_seen[loc.md5sum] = True
381                 total_bytes += loc.size
382
383     return total_bytes
384
385 def create_collection_from(c, src, dst, args):
386     """Create a new collection record on dst, and copy Docker metadata if
387     available."""
388
389     collection_uuid = c['uuid']
390     del c['uuid']
391
392     if not c["name"]:
393         c['name'] = "copied from " + collection_uuid
394
395     if 'properties' in c:
396         del c['properties']
397
398     c['owner_uuid'] = args.project_uuid
399
400     dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
401
402     # Create docker_image_repo+tag and docker_image_hash links
403     # at the destination.
404     for link_class in ("docker_image_repo+tag", "docker_image_hash"):
405         docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
406
407         for d in docker_links:
408             body={
409                 'head_uuid': dst_collection['uuid'],
410                 'link_class': link_class,
411                 'name': d['name'],
412             }
413             body['owner_uuid'] = args.project_uuid
414
415             lk = dst.links().create(body=body).execute(num_retries=args.retries)
416             logger.debug('created dst link {}'.format(lk))
417
418     return dst_collection
419
420 # copy_collection(obj_uuid, src, dst, args)
421 #
422 #    Copies the collection identified by obj_uuid from src to dst.
423 #    Returns the collection object created at dst.
424 #
425 #    If args.progress is True, produce a human-friendly progress
426 #    report.
427 #
428 #    If a collection with the desired portable_data_hash already
429 #    exists at dst, and args.force is False, copy_collection returns
430 #    the existing collection without copying any blocks.  Otherwise
431 #    (if no collection exists or if args.force is True)
432 #    copy_collection copies all of the collection data blocks from src
433 #    to dst.
434 #
435 #    For this application, it is critical to preserve the
436 #    collection's manifest hash, which is not guaranteed with the
437 #    arvados.CollectionReader and arvados.CollectionWriter classes.
438 #    Copying each block in the collection manually, followed by
439 #    the manifest block, ensures that the collection's manifest
440 #    hash will not change.
441 #
442 def copy_collection(obj_uuid, src, dst, args):
443     if arvados.util.keep_locator_pattern.match(obj_uuid):
444         # If the obj_uuid is a portable data hash, it might not be uniquely
445         # identified with a particular collection.  As a result, it is
446         # ambigious as to what name to use for the copy.  Apply some heuristics
447         # to pick which collection to get the name from.
448         srccol = src.collections().list(
449             filters=[['portable_data_hash', '=', obj_uuid]],
450             order="created_at asc"
451             ).execute(num_retries=args.retries)
452
453         items = srccol.get("items")
454
455         if not items:
456             logger.warning("Could not find collection with portable data hash %s", obj_uuid)
457             return
458
459         c = None
460
461         if len(items) == 1:
462             # There's only one collection with the PDH, so use that.
463             c = items[0]
464         if not c:
465             # See if there is a collection that's in the same project
466             # as the root item (usually a pipeline) being copied.
467             for i in items:
468                 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
469                     c = i
470                     break
471         if not c:
472             # Didn't find any collections located in the same project, so
473             # pick the oldest collection that has a name assigned to it.
474             for i in items:
475                 if i.get("name"):
476                     c = i
477                     break
478         if not c:
479             # None of the collections have names (?!), so just pick the
480             # first one.
481             c = items[0]
482
483         # list() doesn't return manifest text (and we don't want it to,
484         # because we don't need the same maninfest text sent to us 50
485         # times) so go and retrieve the collection object directly
486         # which will include the manifest text.
487         c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
488     else:
489         # Assume this is an actual collection uuid, so fetch it directly.
490         c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
491
492     # If a collection with this hash already exists at the
493     # destination, and 'force' is not true, just return that
494     # collection.
495     if not args.force:
496         if 'portable_data_hash' in c:
497             colhash = c['portable_data_hash']
498         else:
499             colhash = c['uuid']
500         dstcol = dst.collections().list(
501             filters=[['portable_data_hash', '=', colhash]]
502         ).execute(num_retries=args.retries)
503         if dstcol['items_available'] > 0:
504             for d in dstcol['items']:
505                 if ((args.project_uuid == d['owner_uuid']) and
506                     (c.get('name') == d['name']) and
507                     (c['portable_data_hash'] == d['portable_data_hash'])):
508                     return d
509             c['manifest_text'] = dst.collections().get(
510                 uuid=dstcol['items'][0]['uuid']
511             ).execute(num_retries=args.retries)['manifest_text']
512             return create_collection_from(c, src, dst, args)
513
514     # Fetch the collection's manifest.
515     manifest = c['manifest_text']
516     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
517
518     # Copy each block from src_keep to dst_keep.
519     # Use the newly signed locators returned from dst_keep to build
520     # a new manifest as we go.
521     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
522     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
523     dst_manifest = ""
524     dst_locators = {}
525     bytes_written = 0
526     bytes_expected = total_collection_size(manifest)
527     if args.progress:
528         progress_writer = ProgressWriter(human_progress)
529     else:
530         progress_writer = None
531
532     for line in manifest.splitlines(True):
533         words = line.split()
534         dst_manifest_line = words[0]
535         for word in words[1:]:
536             try:
537                 loc = arvados.KeepLocator(word)
538                 blockhash = loc.md5sum
539                 # copy this block if we haven't seen it before
540                 # (otherwise, just reuse the existing dst_locator)
541                 if blockhash not in dst_locators:
542                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
543                     if progress_writer:
544                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
545                     data = src_keep.get(word)
546                     dst_locator = dst_keep.put(data)
547                     dst_locators[blockhash] = dst_locator
548                     bytes_written += loc.size
549                 dst_manifest_line += ' ' + dst_locators[blockhash]
550             except ValueError:
551                 # If 'word' can't be parsed as a locator,
552                 # presume it's a filename.
553                 dst_manifest_line += ' ' + word
554         dst_manifest += dst_manifest_line
555         if line.endswith("\n"):
556             dst_manifest += "\n"
557
558     if progress_writer:
559         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
560         progress_writer.finish()
561
562     # Copy the manifest and save the collection.
563     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
564
565     dst_keep.put(dst_manifest.encode('utf-8'))
566     c['manifest_text'] = dst_manifest
567     return create_collection_from(c, src, dst, args)
568
569 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args)
570 #
571 #    Copies commits from git repository 'src_git_repo' on Arvados
572 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
573 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
574 #    or "jsmith")
575 #
576 #    All commits will be copied to a destination branch named for the
577 #    source repository URL.
578 #
579 #    Because users cannot create their own repositories, the
580 #    destination repository must already exist.
581 #
582 #    The user running this command must be authenticated
583 #    to both repositories.
584 #
585 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
586     # Identify the fetch and push URLs for the git repositories.
587     r = src.repositories().list(
588         filters=[['name', '=', src_git_repo]]).execute(num_retries=args.retries)
589     if r['items_available'] != 1:
590         raise Exception('cannot identify source repo {}; {} repos found'
591                         .format(src_git_repo, r['items_available']))
592     src_git_url = r['items'][0]['fetch_url']
593     logger.debug('src_git_url: {}'.format(src_git_url))
594
595     r = dst.repositories().list(
596         filters=[['name', '=', dst_git_repo]]).execute(num_retries=args.retries)
597     if r['items_available'] != 1:
598         raise Exception('cannot identify destination repo {}; {} repos found'
599                         .format(dst_git_repo, r['items_available']))
600     dst_git_push_url  = r['items'][0]['push_url']
601     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
602
603     # script_version is the "script_version" parameter from the source
604     # component or job.  It is used here to tie the destination branch
605     # to the commit that was used on the source.  If no script_version
606     # was supplied in the component or job, it is a mistake in the pipeline,
607     # but for the purposes of copying the repository, default to "master".
608     #
609     if not script_version:
610         script_version = "master"
611
612     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
613
614     # Copy git commits from src repo to dst repo (but only if
615     # we have not already copied this repo in this session).
616     #
617     if src_git_repo in local_repo_dir:
618         logger.debug('already copied src repo %s, skipping', src_git_repo)
619     else:
620         tmprepo = tempfile.mkdtemp()
621         local_repo_dir[src_git_repo] = tmprepo
622         arvados.util.run_command(
623             ["git", "clone", "--bare", src_git_url, tmprepo],
624             cwd=os.path.dirname(tmprepo))
625         arvados.util.run_command(
626             ["git", "branch", dst_branch, script_version],
627             cwd=tmprepo)
628         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
629         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
630
631
632 def copy_docker_images(pipeline, src, dst, args):
633     """Copy any docker images named in the pipeline components'
634     runtime_constraints field from src to dst."""
635
636     logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
637     for c_name, c_info in pipeline['components'].iteritems():
638         if ('runtime_constraints' in c_info and
639             'docker_image' in c_info['runtime_constraints']):
640             copy_docker_image(
641                 c_info['runtime_constraints']['docker_image'],
642                 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
643                 src, dst, args)
644
645
646 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
647     """Copy the docker image identified by docker_image and
648     docker_image_tag from src to dst. Create appropriate
649     docker_image_repo+tag and docker_image_hash links at dst.
650
651     """
652
653     logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
654
655     # Find the link identifying this docker image.
656     docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
657         src, args.retries, docker_image, docker_image_tag)
658     if docker_image_list:
659         image_uuid, image_info = docker_image_list[0]
660         logger.debug('copying collection {} {}'.format(image_uuid, image_info))
661
662         # Copy the collection it refers to.
663         dst_image_col = copy_collection(image_uuid, src, dst, args)
664     elif arvados.util.keep_locator_pattern.match(docker_image):
665         dst_image_col = copy_collection(docker_image, src, dst, args)
666     else:
667         logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
668
669 # git_rev_parse(rev, repo)
670 #
671 #    Returns the 40-character commit hash corresponding to 'rev' in
672 #    git repository 'repo' (which must be the path of a local git
673 #    repository)
674 #
675 def git_rev_parse(rev, repo):
676     gitout, giterr = arvados.util.run_command(
677         ['git', 'rev-parse', rev], cwd=repo)
678     return gitout.strip()
679
680 # uuid_type(api, object_uuid)
681 #
682 #    Returns the name of the class that object_uuid belongs to, based on
683 #    the second field of the uuid.  This function consults the api's
684 #    schema to identify the object class.
685 #
686 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
687 #
688 #    Special case: if handed a Keep locator hash, return 'Collection'.
689 #
690 def uuid_type(api, object_uuid):
691     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
692         return 'Collection'
693     p = object_uuid.split('-')
694     if len(p) == 3:
695         type_prefix = p[1]
696         for k in api._schema.schemas:
697             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
698             if type_prefix == obj_class:
699                 return k
700     return None
701
702 def abort(msg, code=1):
703     logger.info("arv-copy: %s", msg)
704     exit(code)
705
706
707 # Code for reporting on the progress of a collection upload.
708 # Stolen from arvados.commands.put.ArvPutCollectionWriter
709 # TODO(twp): figure out how to refactor into a shared library
710 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
711 # code)
712
713 def machine_progress(obj_uuid, bytes_written, bytes_expected):
714     return "{} {}: {} {} written {} total\n".format(
715         sys.argv[0],
716         os.getpid(),
717         obj_uuid,
718         bytes_written,
719         -1 if (bytes_expected is None) else bytes_expected)
720
721 def human_progress(obj_uuid, bytes_written, bytes_expected):
722     if bytes_expected:
723         return "\r{}: {}M / {}M {:.1%} ".format(
724             obj_uuid,
725             bytes_written >> 20, bytes_expected >> 20,
726             float(bytes_written) / bytes_expected)
727     else:
728         return "\r{}: {} ".format(obj_uuid, bytes_written)
729
730 class ProgressWriter(object):
731     _progress_func = None
732     outfile = sys.stderr
733
734     def __init__(self, progress_func):
735         self._progress_func = progress_func
736
737     def report(self, obj_uuid, bytes_written, bytes_expected):
738         if self._progress_func is not None:
739             self.outfile.write(
740                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
741
742     def finish(self):
743         self.outfile.write("\n")
744
745 if __name__ == '__main__':
746     main()