3699: collection copying bug fixes
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32
33 logger = logging.getLogger('arvados.arv-copy')
34
35 # local_repo_dir records which git repositories from the Arvados source
36 # instance have been checked out locally during this run, and to which
37 # directories.
38 # e.g. if repository 'twp' from src_arv has been cloned into
39 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
40 #
41 local_repo_dir = {}
42
43 # List of collections that have been copied in this session, and their
44 # destination collection UUIDs.
45 collections_copied = {}
46
47 def main():
48     parser = argparse.ArgumentParser(
49         description='Copy a pipeline instance, template or collection from one Arvados instance to another.')
50
51     parser.add_argument(
52         '-v', '--verbose', dest='verbose', action='store_true',
53         help='Verbose output.')
54     parser.add_argument(
55         '--progress', dest='progress', action='store_true',
56         help='Report progress on copying collections. (default)')
57     parser.add_argument(
58         '--no-progress', dest='progress', action='store_false',
59         help='Do not report progress on copying collections.')
60     parser.add_argument(
61         '-f', '--force', dest='force', action='store_true',
62         help='Perform copy even if the object appears to exist at the remote destination.')
63     parser.add_argument(
64         '--src', dest='source_arvados', required=True,
65         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
66     parser.add_argument(
67         '--dst', dest='destination_arvados', required=True,
68         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
69     parser.add_argument(
70         '--recursive', dest='recursive', action='store_true',
71         help='Recursively copy any dependencies for this object. (default)')
72     parser.add_argument(
73         '--no-recursive', dest='recursive', action='store_false',
74         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
75     parser.add_argument(
76         '--dst-git-repo', dest='dst_git_repo',
77         help='The name of the destination git repository. Required when copying a pipeline recursively.')
78     parser.add_argument(
79         '--project-uuid', dest='project_uuid',
80         help='The UUID of the project at the destination to which the pipeline should be copied.')
81     parser.add_argument(
82         'object_uuid',
83         help='The UUID of the object to be copied.')
84     parser.set_defaults(progress=True)
85     parser.set_defaults(recursive=True)
86
87     args = parser.parse_args()
88
89     if args.verbose:
90         logger.setLevel(logging.DEBUG)
91     else:
92         logger.setLevel(logging.INFO)
93
94     # Create API clients for the source and destination instances
95     src_arv = api_for_instance(args.source_arvados)
96     dst_arv = api_for_instance(args.destination_arvados)
97
98     # Identify the kind of object we have been given, and begin copying.
99     t = uuid_type(src_arv, args.object_uuid)
100     if t == 'Collection':
101         result = copy_collection(args.object_uuid,
102                                  src_arv, dst_arv,
103                                  args)
104     elif t == 'PipelineInstance':
105         result = copy_pipeline_instance(args.object_uuid,
106                                         src_arv, dst_arv,
107                                         args)
108     elif t == 'PipelineTemplate':
109         result = copy_pipeline_template(args.object_uuid,
110                                         src_arv, dst_arv, args)
111     else:
112         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
113
114     # Clean up any outstanding temp git repositories.
115     for d in local_repo_dir.values():
116         shutil.rmtree(d, ignore_errors=True)
117
118     # If no exception was thrown and the response does not have an
119     # error_token field, presume success
120     if 'error_token' in result or 'uuid' not in result:
121         logger.error("API server returned an error result: {}".format(result))
122         exit(1)
123
124     logger.info("")
125     logger.info("Success: created copy with uuid {}".format(result['uuid']))
126     exit(0)
127
128 # api_for_instance(instance_name)
129 #
130 #     Creates an API client for the Arvados instance identified by
131 #     instance_name.
132 #
133 #     If instance_name contains a slash, it is presumed to be a path
134 #     (either local or absolute) to a file with Arvados configuration
135 #     settings.
136 #
137 #     Otherwise, it is presumed to be the name of a file in
138 #     $HOME/.config/arvados/instance_name.conf
139 #
140 def api_for_instance(instance_name):
141     if '/' in instance_name:
142         config_file = instance_name
143     else:
144         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
145
146     try:
147         cfg = arvados.config.load(config_file)
148     except (IOError, OSError) as e:
149         abort(("Could not open config file {}: {}\n" +
150                "You must make sure that your configuration tokens\n" +
151                "for Arvados instance {} are in {} and that this\n" +
152                "file is readable.").format(
153                    config_file, e, instance_name, config_file))
154
155     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
156         api_is_insecure = (
157             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
158                 ['1', 't', 'true', 'y', 'yes']))
159         client = arvados.api('v1',
160                              host=cfg['ARVADOS_API_HOST'],
161                              token=cfg['ARVADOS_API_TOKEN'],
162                              insecure=api_is_insecure,
163                              cache=False)
164     else:
165         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
166     return client
167
168 # copy_pipeline_instance(pi_uuid, src, dst, args)
169 #
170 #    Copies a pipeline instance identified by pi_uuid from src to dst.
171 #
172 #    If the args.recursive option is set:
173 #      1. Copies all input collections
174 #           * For each component in the pipeline, include all collections
175 #             listed as job dependencies for that component)
176 #      2. Copy docker images
177 #      3. Copy git repositories
178 #      4. Copy the pipeline template
179 #
180 #    The only changes made to the copied pipeline instance are:
181 #      1. The original pipeline instance UUID is preserved in
182 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
183 #      2. The pipeline_template_uuid is changed to the new template uuid.
184 #      3. The owner_uuid of the instance is changed to the user who
185 #         copied it.
186 #
187 def copy_pipeline_instance(pi_uuid, src, dst, args):
188     # Fetch the pipeline instance record.
189     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
190
191     if args.recursive:
192         if not args.dst_git_repo:
193             abort('--dst-git-repo is required when copying a pipeline recursively.')
194         # Copy the pipeline template and save the copied template.
195         if pi.get('pipeline_template_uuid', None):
196             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
197                                         src, dst, args)
198
199         # Copy input collections, docker images and git repos.
200         pi = copy_collections(pi, src, dst, args)
201         copy_git_repos(pi, src, dst, args.dst_git_repo)
202
203         # Update the fields of the pipeline instance with the copied
204         # pipeline template.
205         if pi.get('pipeline_template_uuid', None):
206             pi['pipeline_template_uuid'] = pt['uuid']
207
208     else:
209         # not recursive
210         logger.info("Copying only pipeline instance %s.", pi_uuid)
211         logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
212
213     # Update the pipeline instance properties, and create the new
214     # instance at dst.
215     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
216     pi['description'] = "Pipeline copied from {}\n\n{}".format(
217         pi_uuid, pi.get('description', ''))
218     if args.project_uuid:
219         pi['owner_uuid'] = args.project_uuid
220     else:
221         del pi['owner_uuid']
222     del pi['uuid']
223
224     new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute()
225     return new_pi
226
227 # copy_pipeline_template(pt_uuid, src, dst, args)
228 #
229 #    Copies a pipeline template identified by pt_uuid from src to dst.
230 #
231 #    If args.recursive is True, also copy any collections, docker
232 #    images and git repositories that this template references.
233 #
234 #    The owner_uuid of the new template is changed to that of the user
235 #    who copied the template.
236 #
237 #    Returns the copied pipeline template object.
238 #
239 def copy_pipeline_template(pt_uuid, src, dst, args):
240     # fetch the pipeline template from the source instance
241     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
242
243     if args.recursive:
244         if not args.dst_git_repo:
245             abort('--dst-git-repo is required when copying a pipeline recursively.')
246         # Copy input collections, docker images and git repos.
247         pt = copy_collections(pt, src, dst, args)
248         copy_git_repos(pt, src, dst, args.dst_git_repo)
249
250     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
251         pt_uuid, pt.get('description', ''))
252     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
253     del pt['uuid']
254     del pt['owner_uuid']
255
256     return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute()
257
258 # copy_collections(obj, src, dst, args)
259 #
260 #    Recursively copies all collections referenced by 'obj' from src
261 #    to dst.  obj may be a dict or a list, in which case we run
262 #    copy_collections on every value it contains. If it is a string,
263 #    search it for any substring that matches a collection hash or uuid
264 #    (this will find hidden references to collections like
265 #      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
266 #
267 #    Returns a copy of obj with any old collection uuids replaced by
268 #    the new ones.
269 #
270 def copy_collections(obj, src, dst, args):
271
272     def copy_collection_fn(src_id):
273         """Helper function for regex substitution: copies a single collection
274         identified by 'src_id' to the destination.  Returns the
275         destination collection uuid (or the portable data hash if
276         that's what src_id is).
277
278         """
279         if src_id not in collections_copied:
280             dst_col = copy_collection(src_id, src, dst, args)
281             if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
282                 collections_copied[src_id] = src_id
283             else:
284                 collections_copied[src_id] = dst_col['uuid']
285         return collections_copied[src_id]
286
287     if isinstance(obj, basestring):
288         # Copy any collections identified in this string to dst, replacing
289         # them with the dst uuids as necessary.
290         obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
291         obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
292         return obj
293     elif type(obj) == dict:
294         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
295     elif type(obj) == list:
296         return [copy_collections(v, src, dst, args) for v in obj]
297     return obj
298
299 # copy_git_repos(p, src, dst, dst_repo)
300 #
301 #    Copies all git repositories referenced by pipeline instance or
302 #    template 'p' from src to dst.
303 #
304 #    For each component c in the pipeline:
305 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
306 #      * Rename script versions:
307 #          * c['script_version']
308 #          * c['job']['script_version']
309 #          * c['job']['supplied_script_version']
310 #        to the commit hashes they resolve to, since any symbolic
311 #        names (tags, branches) are not preserved in the destination repo.
312 #
313 #    The pipeline object is updated in place with the new repository
314 #    names.  The return value is undefined.
315 #
316 def copy_git_repos(p, src, dst, dst_repo):
317     copied = set()
318     for c in p['components']:
319         component = p['components'][c]
320         if 'repository' in component:
321             repo = component['repository']
322             script_version = component.get('script_version', None)
323             if repo not in copied:
324                 copy_git_repo(repo, src, dst, dst_repo, script_version)
325                 copied.add(repo)
326             component['repository'] = dst_repo
327             if script_version:
328                 repo_dir = local_repo_dir[repo]
329                 component['script_version'] = git_rev_parse(script_version, repo_dir)
330         if 'job' in component:
331             j = component['job']
332             if 'repository' in j:
333                 repo = j['repository']
334                 script_version = j.get('script_version', None)
335                 if repo not in copied:
336                     copy_git_repo(repo, src, dst, dst_repo, script_version)
337                     copied.add(repo)
338                 j['repository'] = dst_repo
339                 repo_dir = local_repo_dir[repo]
340                 if script_version:
341                     j['script_version'] = git_rev_parse(script_version, repo_dir)
342                 if 'supplied_script_version' in j:
343                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
344
345 def total_collection_size(manifest_text):
346     """Return the total number of bytes in this collection (excluding
347     duplicate blocks)."""
348
349     total_bytes = 0
350     locators_seen = {}
351     for line in manifest_text.splitlines():
352         words = line.split()
353         for word in words[1:]:
354             try:
355                 loc = arvados.KeepLocator(word)
356             except ValueError:
357                 continue  # this word isn't a locator, skip it
358             if loc.md5sum not in locators_seen:
359                 locators_seen[loc.md5sum] = True
360                 total_bytes += loc.size
361
362     return total_bytes
363
364 # copy_collection(obj_uuid, src, dst, args)
365 #
366 #    Copies the collection identified by obj_uuid from src to dst.
367 #    Returns the collection object created at dst.
368 #
369 #    If args.progress is True, produce a human-friendly progress
370 #    report.
371 #
372 #    If a collection with the desired portable_data_hash already
373 #    exists at dst, and args.force is False, copy_collection returns
374 #    the existing collection without copying any blocks.  Otherwise
375 #    (if no collection exists or if args.force is True)
376 #    copy_collection copies all of the collection data blocks from src
377 #    to dst.
378 #
379 #    For this application, it is critical to preserve the
380 #    collection's manifest hash, which is not guaranteed with the
381 #    arvados.CollectionReader and arvados.CollectionWriter classes.
382 #    Copying each block in the collection manually, followed by
383 #    the manifest block, ensures that the collection's manifest
384 #    hash will not change.
385 #
386 def copy_collection(obj_uuid, src, dst, args):
387     c = src.collections().get(uuid=obj_uuid).execute()
388
389     # If a collection with this hash already exists at the
390     # destination, and 'force' is not true, just return that
391     # collection.
392     if not args.force:
393         if 'portable_data_hash' in c:
394             colhash = c['portable_data_hash']
395         else:
396             colhash = c['uuid']
397         dstcol = dst.collections().list(
398             filters=[['portable_data_hash', '=', colhash]]
399         ).execute()
400         if dstcol['items_available'] > 0:
401             logger.debug("Skipping collection %s (already at dst)", obj_uuid)
402             return dstcol['items'][0]
403
404     # Fetch the collection's manifest.
405     manifest = c['manifest_text']
406     logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
407
408     # Copy each block from src_keep to dst_keep.
409     # Use the newly signed locators returned from dst_keep to build
410     # a new manifest as we go.
411     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
412     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
413     dst_manifest = ""
414     dst_locators = {}
415     bytes_written = 0
416     bytes_expected = total_collection_size(manifest)
417     if args.progress:
418         progress_writer = ProgressWriter(human_progress)
419     else:
420         progress_writer = None
421
422     for line in manifest.splitlines():
423         words = line.split()
424         dst_manifest_line = words[0]
425         for word in words[1:]:
426             try:
427                 loc = arvados.KeepLocator(word)
428                 blockhash = loc.md5sum
429                 # copy this block if we haven't seen it before
430                 # (otherwise, just reuse the existing dst_locator)
431                 if blockhash not in dst_locators:
432                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
433                     if progress_writer:
434                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
435                     data = src_keep.get(word)
436                     dst_locator = dst_keep.put(data)
437                     dst_locators[blockhash] = dst_locator
438                     bytes_written += loc.size
439                 dst_manifest_line += ' ' + dst_locators[blockhash]
440             except ValueError:
441                 # If 'word' can't be parsed as a locator,
442                 # presume it's a filename.
443                 dst_manifest_line += ' ' + word
444         dst_manifest += dst_manifest_line
445         if line.endswith("\n"):
446             dst_manifest += "\n"
447
448     if progress_writer:
449         progress_writer.finish()
450
451     # Copy the manifest and save the collection.
452     logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
453     dst_keep.put(dst_manifest)
454
455     if 'uuid' in c:
456         del c['uuid']
457     if 'owner_uuid' in c:
458         del c['owner_uuid']
459     c['manifest_text'] = dst_manifest
460     return dst.collections().create(body=c, ensure_unique_name=True).execute()
461
462 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version)
463 #
464 #    Copies commits from git repository 'src_git_repo' on Arvados
465 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
466 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
467 #    or "jsmith")
468 #
469 #    All commits will be copied to a destination branch named for the
470 #    source repository URL.
471 #
472 #    Because users cannot create their own repositories, the
473 #    destination repository must already exist.
474 #
475 #    The user running this command must be authenticated
476 #    to both repositories.
477 #
478 def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version):
479     # Identify the fetch and push URLs for the git repositories.
480     r = src.repositories().list(
481         filters=[['name', '=', src_git_repo]]).execute()
482     if r['items_available'] != 1:
483         raise Exception('cannot identify source repo {}; {} repos found'
484                         .format(src_git_repo, r['items_available']))
485     src_git_url = r['items'][0]['fetch_url']
486     logger.debug('src_git_url: {}'.format(src_git_url))
487
488     r = dst.repositories().list(
489         filters=[['name', '=', dst_git_repo]]).execute()
490     if r['items_available'] != 1:
491         raise Exception('cannot identify destination repo {}; {} repos found'
492                         .format(dst_git_repo, r['items_available']))
493     dst_git_push_url  = r['items'][0]['push_url']
494     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
495
496     # script_version is the "script_version" parameter from the source
497     # component or job.  It is used here to tie the destination branch
498     # to the commit that was used on the source.  If no script_version
499     # was supplied in the component or job, it is a mistake in the pipeline,
500     # but for the purposes of copying the repository, default to "master".
501     #
502     if not script_version:
503         script_version = "master"
504
505     dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
506
507     # Copy git commits from src repo to dst repo (but only if
508     # we have not already copied this repo in this session).
509     #
510     if src_git_repo in local_repo_dir:
511         logger.debug('already copied src repo %s, skipping', src_git_repo)
512     else:
513         tmprepo = tempfile.mkdtemp()
514         local_repo_dir[src_git_repo] = tmprepo
515         arvados.util.run_command(
516             ["git", "clone", "--bare", src_git_url, tmprepo],
517             cwd=os.path.dirname(tmprepo))
518         arvados.util.run_command(
519             ["git", "branch", dst_branch, script_version],
520             cwd=tmprepo)
521         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
522         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
523
524 # git_rev_parse(rev, repo)
525 #
526 #    Returns the 40-character commit hash corresponding to 'rev' in
527 #    git repository 'repo' (which must be the path of a local git
528 #    repository)
529 #
530 def git_rev_parse(rev, repo):
531     gitout, giterr = arvados.util.run_command(
532         ['git', 'rev-parse', rev], cwd=repo)
533     return gitout.strip()
534
535 # uuid_type(api, object_uuid)
536 #
537 #    Returns the name of the class that object_uuid belongs to, based on
538 #    the second field of the uuid.  This function consults the api's
539 #    schema to identify the object class.
540 #
541 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
542 #
543 #    Special case: if handed a Keep locator hash, return 'Collection'.
544 #
545 def uuid_type(api, object_uuid):
546     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
547         return 'Collection'
548     p = object_uuid.split('-')
549     if len(p) == 3:
550         type_prefix = p[1]
551         for k in api._schema.schemas:
552             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
553             if type_prefix == obj_class:
554                 return k
555     return None
556
557 def abort(msg, code=1):
558     logger.info("arv-copy:", msg)
559     exit(code)
560
561
562 # Code for reporting on the progress of a collection upload.
563 # Stolen from arvados.commands.put.ArvPutCollectionWriter
564 # TODO(twp): figure out how to refactor into a shared library
565 # (may involve refactoring some arvados.commands.copy.copy_collection
566 # code)
567
568 def machine_progress(obj_uuid, bytes_written, bytes_expected):
569     return "{} {}: {} {} written {} total\n".format(
570         sys.argv[0],
571         os.getpid(),
572         obj_uuid,
573         bytes_written,
574         -1 if (bytes_expected is None) else bytes_expected)
575
576 def human_progress(obj_uuid, bytes_written, bytes_expected):
577     if bytes_expected:
578         return "\r{}: {}M / {}M {:.1%} ".format(
579             obj_uuid,
580             bytes_written >> 20, bytes_expected >> 20,
581             float(bytes_written) / bytes_expected)
582     else:
583         return "\r{}: {} ".format(obj_uuid, bytes_written)
584
585 class ProgressWriter(object):
586     _progress_func = None
587     outfile = sys.stderr
588
589     def __init__(self, progress_func):
590         self._progress_func = progress_func
591
592     def report(self, obj_uuid, bytes_written, bytes_expected):
593         if self._progress_func is not None:
594             self.outfile.write(
595                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
596
597     def finish(self):
598         self.outfile.write("\n")
599
600 if __name__ == '__main__':
601     main()