3699: bugfix: some leftover recursive/force options
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32
33 logger = logging.getLogger('arvados.arv-copy')
34
35 # local_repo_dir records which git repositories from the Arvados source
36 # instance have been checked out locally during this run, and to which
37 # directories.
38 # e.g. if repository 'twp' from src_arv has been cloned into
39 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
40 #
41 local_repo_dir = {}
42
43 def main():
44     parser = argparse.ArgumentParser(
45         description='Copy a pipeline instance from one Arvados instance to another.')
46
47     parser.add_argument(
48         '-v', '--verbose', dest='verbose', action='store_true',
49         help='Verbose output.')
50     parser.add_argument(
51         '--progress', dest='progress', action='store_true',
52         help='Report progress on copying collections. (default)')
53     parser.add_argument(
54         '--no-progress', dest='progress', action='store_false',
55         help='Do not report progress on copying collections.')
56     parser.add_argument(
57         '-f', '--force', dest='force', action='store_true',
58         help='Perform copy even if the object appears to exist at the remote destination.')
59     parser.add_argument(
60         '--src', dest='source_arvados', required=True,
61         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
62     parser.add_argument(
63         '--dst', dest='destination_arvados', required=True,
64         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
65     parser.add_argument(
66         '--recursive', dest='recursive', action='store_true',
67         help='Recursively copy any dependencies for this object. (default)')
68     parser.add_argument(
69         '--no-recursive', dest='recursive', action='store_false',
70         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
71     parser.add_argument(
72         '--dst-git-repo', dest='dst_git_repo',
73         help='The name of the destination git repository. Required when copying a pipeline recursively.')
74     parser.add_argument(
75         '--project-uuid', dest='project_uuid',
76         help='The UUID of the project at the destination to which the pipeline should be copied.')
77     parser.add_argument(
78         'object_uuid',
79         help='The UUID of the object to be copied.')
80     parser.set_defaults(progress=True)
81     parser.set_defaults(recursive=True)
82
83     args = parser.parse_args()
84
85     if args.verbose:
86         logger.setLevel(logging.DEBUG)
87     else:
88         logger.setLevel(logging.INFO)
89
90     # Create API clients for the source and destination instances
91     src_arv = api_for_instance(args.source_arvados)
92     dst_arv = api_for_instance(args.destination_arvados)
93
94     # Identify the kind of object we have been given, and begin copying.
95     t = uuid_type(src_arv, args.object_uuid)
96     if t == 'Collection':
97         result = copy_collection(args.object_uuid,
98                                  src_arv, dst_arv,
99                                  args)
100     elif t == 'PipelineInstance':
101         result = copy_pipeline_instance(args.object_uuid,
102                                         src_arv, dst_arv,
103                                         args)
104     elif t == 'PipelineTemplate':
105         result = copy_pipeline_template(args.object_uuid,
106                                         src_arv, dst_arv, args)
107     else:
108         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
109
110     # Clean up any outstanding temp git repositories.
111     for d in local_repo_dir.values():
112         shutil.rmtree(d, ignore_errors=True)
113
114     # If no exception was thrown and the response does not have an
115     # error_token field, presume success
116     if 'error_token' in result or 'uuid' not in result:
117         logger.error("API server returned an error result: {}".format(result))
118         exit(1)
119
120     logger.info("")
121     logger.info("Success: created copy with uuid {}".format(result['uuid']))
122     exit(0)
123
124 # api_for_instance(instance_name)
125 #
126 #     Creates an API client for the Arvados instance identified by
127 #     instance_name.
128 #
129 #     If instance_name contains a slash, it is presumed to be a path
130 #     (either local or absolute) to a file with Arvados configuration
131 #     settings.
132 #
133 #     Otherwise, it is presumed to be the name of a file in
134 #     $HOME/.config/arvados/instance_name.conf
135 #
136 def api_for_instance(instance_name):
137     if '/' in instance_name:
138         config_file = instance_name
139     else:
140         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
141
142     try:
143         cfg = arvados.config.load(config_file)
144     except (IOError, OSError) as e:
145         abort(("Could not open config file {}: {}\n" +
146                "You must make sure that your configuration tokens\n" +
147                "for Arvados instance {} are in {} and that this\n" +
148                "file is readable.").format(
149                    config_file, e, instance_name, config_file))
150
151     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
152         api_is_insecure = (
153             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
154                 ['1', 't', 'true', 'y', 'yes']))
155         client = arvados.api('v1',
156                              host=cfg['ARVADOS_API_HOST'],
157                              token=cfg['ARVADOS_API_TOKEN'],
158                              insecure=api_is_insecure,
159                              cache=False)
160     else:
161         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
162     return client
163
164 # copy_pipeline_instance(pi_uuid, src, dst, args)
165 #
166 #    Copies a pipeline instance identified by pi_uuid from src to dst.
167 #
168 #    If the args.recursive option is set:
169 #      1. Copies all input collections
170 #           * For each component in the pipeline, include all collections
171 #             listed as job dependencies for that component)
172 #      2. Copy docker images
173 #      3. Copy git repositories
174 #      4. Copy the pipeline template
175 #
176 #    The only changes made to the copied pipeline instance are:
177 #      1. The original pipeline instance UUID is preserved in
178 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
179 #      2. The pipeline_template_uuid is changed to the new template uuid.
180 #      3. The owner_uuid of the instance is changed to the user who
181 #         copied it.
182 #
183 def copy_pipeline_instance(pi_uuid, src, dst, args):
184     # Fetch the pipeline instance record.
185     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
186
187     if args.recursive:
188         if not args.dst_git_repo:
189             abort('--dst-git-repo is required when copying a pipeline recursively.')
190         # Copy the pipeline template and save the copied template.
191         if pi.get('pipeline_template_uuid', None):
192             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
193                                         src, dst, args)
194
195         # Copy input collections, docker images and git repos.
196         pi = copy_collections(pi, src, dst, args)
197         copy_git_repos(pi, src, dst, args.dst_git_repo)
198
199         # Update the fields of the pipeline instance with the copied
200         # pipeline template.
201         if pi.get('pipeline_template_uuid', None):
202             pi['pipeline_template_uuid'] = pt['uuid']
203
204     else:
205         # not recursive
206         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
207         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
208
209     # Update the pipeline instance properties, and create the new
210     # instance at dst.
211     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
212     pi['description'] = "Pipeline copied from {}\n\n{}".format(
213         pi_uuid, pi.get('description', ''))
214     if dst_project:
215         pi['owner_uuid'] = dst_project
216     else:
217         del pi['owner_uuid']
218     del pi['uuid']
219     pi['ensure_unique_name'] = True
220
221     new_pi = dst.pipeline_instances().create(body=pi).execute()
222     return new_pi
223
224 # copy_pipeline_template(pt_uuid, src, dst, args)
225 #
226 #    Copies a pipeline template identified by pt_uuid from src to dst.
227 #
228 #    If args.recursive is True, also copy any collections, docker
229 #    images and git repositories that this template references.
230 #
231 #    The owner_uuid of the new template is changed to that of the user
232 #    who copied the template.
233 #
234 #    Returns the copied pipeline template object.
235 #
236 def copy_pipeline_template(pt_uuid, src, dst, args):
237     # fetch the pipeline template from the source instance
238     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
239
240     if args.recursive:
241         if not args.dst_git_repo:
242             abort('--dst-git-repo is required when copying a pipeline recursively.')
243         # Copy input collections, docker images and git repos.
244         pt = copy_collections(pt, src, dst, args)
245         copy_git_repos(pt, src, dst, args.dst_git_repo)
246
247     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
248         pt_uuid, pt.get('description', ''))
249     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
250     pt['ensure_unique_name'] = True
251     del pt['uuid']
252     del pt['owner_uuid']
253
254     return dst.pipeline_templates().create(body=pt).execute()
255
256 # copy_collections(obj, src, dst, args)
257 #
258 #    Recursively copies all collections referenced by 'obj' from src
259 #    to dst.
260 #
261 #    Returns a copy of obj with any old collection uuids replaced by
262 #    the new ones.
263 #
264 def copy_collections(obj, src, dst, args):
265     if type(obj) in [str, unicode]:
266         if uuid_type(src, obj) == 'Collection':
267             newc = copy_collection(obj, src, dst, args)
268             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
269                 return newc['uuid']
270         return obj
271     elif type(obj) == dict:
272         return {v: copy_collections(obj[v], src, dst, args) for v in obj}
273     elif type(obj) == list:
274         return [copy_collections(v, src, dst, args) for v in obj]
275     return obj
276
277 # copy_git_repos(p, src, dst, dst_repo)
278 #
279 #    Copies all git repositories referenced by pipeline instance or
280 #    template 'p' from src to dst.
281 #
282 #    For each component c in the pipeline:
283 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
284 #      * Rename script versions:
285 #          * c['script_version']
286 #          * c['job']['script_version']
287 #          * c['job']['supplied_script_version']
288 #        to the commit hashes they resolve to, since any symbolic
289 #        names (tags, branches) are not preserved in the destination repo.
290 #
291 #    The pipeline object is updated in place with the new repository
292 #    names.  The return value is undefined.
293 #
294 def copy_git_repos(p, src, dst, dst_repo):
295     copied = set()
296     for c in p['components']:
297         component = p['components'][c]
298         if 'repository' in component:
299             repo = component['repository']
300             if repo not in copied:
301                 copy_git_repo(repo, src, dst, dst_repo)
302                 copied.add(repo)
303             component['repository'] = dst_repo
304             if 'script_version' in component:
305                 repo_dir = local_repo_dir[repo]
306                 component['script_version'] = git_rev_parse(component['script_version'], repo_dir)
307         if 'job' in component:
308             j = component['job']
309             if 'repository' in j:
310                 repo = j['repository']
311                 if repo not in copied:
312                     copy_git_repo(repo, src, dst, dst_repo)
313                     copied.add(repo)
314                 j['repository'] = dst_repo
315                 repo_dir = local_repo_dir[repo]
316                 if 'script_version' in j:
317                     j['script_version'] = git_rev_parse(j['script_version'], repo_dir)
318                 if 'supplied_script_version' in j:
319                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
320
321 def total_collection_size(manifest_text):
322     """Return the total number of bytes in this collection (excluding
323     duplicate blocks)."""
324
325     total_bytes = 0
326     locators_seen = {}
327     for line in manifest_text.splitlines():
328         words = line.split()
329         for word in words[1:]:
330             try:
331                 loc = arvados.KeepLocator(word)
332             except ValueError:
333                 continue  # this word isn't a locator, skip it
334             if loc.md5sum not in locators_seen:
335                 locators_seen[loc.md5sum] = True
336                 total_bytes += loc.size
337
338     return total_bytes
339
340 # copy_collection(obj_uuid, src, dst, args)
341 #
342 #    Copies the collection identified by obj_uuid from src to dst.
343 #    Returns the collection object created at dst.
344 #
345 #    If args.progress is True, produce a human-friendly progress
346 #    report.
347 #
348 #    If a collection with the desired portable_data_hash already
349 #    exists at dst, and args.force is False, copy_collection returns
350 #    the existing collection without copying any blocks.  Otherwise
351 #    (if no collection exists or if args.force is True)
352 #    copy_collection copies all of the collection data blocks from src
353 #    to dst.
354 #
355 #    For this application, it is critical to preserve the
356 #    collection's manifest hash, which is not guaranteed with the
357 #    arvados.CollectionReader and arvados.CollectionWriter classes.
358 #    Copying each block in the collection manually, followed by
359 #    the manifest block, ensures that the collection's manifest
360 #    hash will not change.
361 #
362 def copy_collection(obj_uuid, src, dst, args):
363     c = src.collections().get(uuid=obj_uuid).execute()
364
365     # If a collection with this hash already exists at the
366     # destination, and 'force' is not true, just return that
367     # collection.
368     if not args.force:
369         if 'portable_data_hash' in c:
370             colhash = c['portable_data_hash']
371         else:
372             colhash = c['uuid']
373         dstcol = dst.collections().list(
374             filters=[['portable_data_hash', '=', colhash]]
375         ).execute()
376         if dstcol['items_available'] > 0:
377             logger.debug("Skipping collection %s (already at dst)", obj_uuid)
378             return dstcol['items'][0]
379
380     logger.debug("Copying collection %s", obj_uuid)
381
382     # Fetch the collection's manifest.
383     manifest = c['manifest_text']
384
385     # Copy each block from src_keep to dst_keep.
386     # Use the newly signed locators returned from dst_keep to build
387     # a new manifest as we go.
388     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
389     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
390     dst_manifest = ""
391     dst_locators = {}
392     bytes_written = 0
393     bytes_expected = total_collection_size(manifest)
394     if args.progress:
395         progress_writer = ProgressWriter(human_progress)
396     else:
397         progress_writer = None
398
399     for line in manifest.splitlines():
400         words = line.split()
401         dst_manifest_line = words[0]
402         for word in words[1:]:
403             try:
404                 loc = arvados.KeepLocator(word)
405                 blockhash = loc.md5sum
406                 # copy this block if we haven't seen it before
407                 # (otherwise, just reuse the existing dst_locator)
408                 if blockhash not in dst_locators:
409                     logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
410                     if progress_writer:
411                         progress_writer.report(obj_uuid, bytes_written, bytes_expected)
412                     data = src_keep.get(word)
413                     dst_locator = dst_keep.put(data)
414                     dst_locators[blockhash] = dst_locator
415                     bytes_written += loc.size
416                 dst_manifest_line += ' ' + dst_locators[blockhash]
417             except ValueError:
418                 # If 'word' can't be parsed as a locator,
419                 # presume it's a filename.
420                 dst_manifest_line += ' ' + word
421         dst_manifest += dst_manifest_line + "\n"
422
423     if progress_writer:
424         progress_writer.finish()
425
426     # Copy the manifest and save the collection.
427     logger.debug('saving {} manifest: {}'.format(obj_uuid, dst_manifest))
428     dst_keep.put(dst_manifest)
429
430     if 'uuid' in c:
431         del c['uuid']
432     if 'owner_uuid' in c:
433         del c['owner_uuid']
434     c['ensure_unique_name'] = True
435     c['manifest_text'] = dst_manifest
436     return dst.collections().create(body=c).execute()
437
438 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
439 #
440 #    Copies commits from git repository 'src_git_repo' on Arvados
441 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
442 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
443 #    or "jsmith")
444 #
445 #    All commits will be copied to a destination branch named for the
446 #    source repository URL.
447 #
448 #    Because users cannot create their own repositories, the
449 #    destination repository must already exist.
450 #
451 #    The user running this command must be authenticated
452 #    to both repositories.
453 #
454 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
455     # Identify the fetch and push URLs for the git repositories.
456     r = src.repositories().list(
457         filters=[['name', '=', src_git_repo]]).execute()
458     if r['items_available'] != 1:
459         raise Exception('cannot identify source repo {}; {} repos found'
460                         .format(src_git_repo, r['items_available']))
461     src_git_url = r['items'][0]['fetch_url']
462     logger.debug('src_git_url: {}'.format(src_git_url))
463
464     r = dst.repositories().list(
465         filters=[['name', '=', dst_git_repo]]).execute()
466     if r['items_available'] != 1:
467         raise Exception('cannot identify destination repo {}; {} repos found'
468                         .format(dst_git_repo, r['items_available']))
469     dst_git_push_url  = r['items'][0]['push_url']
470     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
471
472     dst_branch = re.sub(r'\W+', '_', src_git_url)
473
474     # Copy git commits from src repo to dst repo (but only if
475     # we have not already copied this repo in this session).
476     #
477     if src_git_repo in local_repo_dir:
478         logger.debug('already copied src repo %s, skipping', src_git_repo)
479     else:
480         tmprepo = tempfile.mkdtemp()
481         local_repo_dir[src_git_repo] = tmprepo
482         arvados.util.run_command(
483             ["git", "clone", src_git_url, tmprepo],
484             cwd=os.path.dirname(tmprepo))
485         arvados.util.run_command(
486             ["git", "checkout", "-b", dst_branch],
487             cwd=tmprepo)
488         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
489         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
490
491 # git_rev_parse(rev, repo)
492 #
493 #    Returns the 40-character commit hash corresponding to 'rev' in
494 #    git repository 'repo' (which must be the path of a local git
495 #    repository)
496 #
497 def git_rev_parse(rev, repo):
498     gitout, giterr = arvados.util.run_command(
499         ['git', 'rev-parse', rev], cwd=repo)
500     return gitout.strip()
501
502 # uuid_type(api, object_uuid)
503 #
504 #    Returns the name of the class that object_uuid belongs to, based on
505 #    the second field of the uuid.  This function consults the api's
506 #    schema to identify the object class.
507 #
508 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
509 #
510 #    Special case: if handed a Keep locator hash, return 'Collection'.
511 #
512 def uuid_type(api, object_uuid):
513     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
514         return 'Collection'
515     p = object_uuid.split('-')
516     if len(p) == 3:
517         type_prefix = p[1]
518         for k in api._schema.schemas:
519             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
520             if type_prefix == obj_class:
521                 return k
522     return None
523
524 def abort(msg, code=1):
525     print >>sys.stderr, "arv-copy:", msg
526     exit(code)
527
528
529 # Code for reporting on the progress of a collection upload.
530 # Stolen from arvados.commands.put.ArvPutCollectionWriter
531 # TODO(twp): figure out how to refactor into a shared library
532 # (may involve refactoring some arvados.commands.copy.copy_collection
533 # code)
534
535 def machine_progress(obj_uuid, bytes_written, bytes_expected):
536     return "{} {}: {} {} written {} total\n".format(
537         sys.argv[0],
538         os.getpid(),
539         obj_uuid,
540         bytes_written,
541         -1 if (bytes_expected is None) else bytes_expected)
542
543 def human_progress(obj_uuid, bytes_written, bytes_expected):
544     if bytes_expected:
545         return "\r{}: {}M / {}M {:.1%} ".format(
546             obj_uuid,
547             bytes_written >> 20, bytes_expected >> 20,
548             float(bytes_written) / bytes_expected)
549     else:
550         return "\r{}: {} ".format(obj_uuid, bytes_written)
551
552 class ProgressWriter(object):
553     _progress_func = None
554     outfile = sys.stderr
555
556     def __init__(self, progress_func):
557         self._progress_func = progress_func
558
559     def report(self, obj_uuid, bytes_written, bytes_expected):
560         if self._progress_func is not None:
561             self.outfile.write(
562                 self._progress_func(obj_uuid, bytes_written, bytes_expected))
563
564     def finish(self):
565         self.outfile.write("\n")
566
567 if __name__ == '__main__':
568     main()