3699: more help text for --src and --dst options
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import sets
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31
32 logger = logging.getLogger('arvados.arv-copy')
33
34 def main():
35     logger.setLevel(logging.DEBUG)
36
37     parser = argparse.ArgumentParser(
38         description='Copy a pipeline instance from one Arvados instance to another.')
39
40     parser.add_argument(
41         '--src', dest='source_arvados', required=True,
42         help='The name of the source Arvados instance. May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
43     parser.add_argument(
44         '--dst', dest='destination_arvados', required=True,
45         help='The name of the destination Arvados instance (interpreted as in --src)')
46     parser.add_argument(
47         '--recursive', dest='recursive', action='store_true',
48         help='Recursively copy any dependencies for this object. (default)')
49     parser.add_argument(
50         '--no-recursive', dest='recursive', action='store_false',
51         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
52     parser.add_argument(
53         '--dst-git-repo', dest='dst_git_repo',
54         help='The name of the destination git repository. Required when copying a pipeline recursively.')
55     parser.add_argument(
56         '--project_uuid', dest='project_uuid',
57         help='The UUID of the project at the destination to which the pipeline should be copied.')
58     parser.add_argument(
59         'object_uuid',
60         help='The UUID of the object to be copied.')
61     parser.set_defaults(recursive=True)
62
63     args = parser.parse_args()
64
65     # Create API clients for the source and destination instances
66     src_arv = api_for_instance(args.source_arvados)
67     dst_arv = api_for_instance(args.destination_arvados)
68
69     # Identify the kind of object we have been given, and begin copying.
70     t = uuid_type(src_arv, args.object_uuid)
71     if t == 'Collection':
72         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
73     elif t == 'PipelineInstance':
74         result = copy_pipeline_instance(args.object_uuid,
75                                         src_arv, dst_arv,
76                                         args.dst_git_repo,
77                                         dst_project=args.project_uuid,
78                                         recursive=args.recursive)
79     elif t == 'PipelineTemplate':
80         result = copy_pipeline_template(args.object_uuid,
81                                         src_arv, dst_arv,
82                                         args.dst_git_repo,
83                                         recursive=args.recursive)
84     else:
85         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
86
87     # If no exception was thrown and the response does not have an
88     # error_token field, presume success
89     if 'error_token' in result or 'uuid' not in result:
90         logger.error("API server returned an error result: {}".format(result))
91         exit(1)
92
93     logger.info("")
94     logger.info("Success: created copy with uuid {}".format(result['uuid']))
95     exit(0)
96
97 # api_for_instance(instance_name)
98 #
99 #     Creates an API client for the Arvados instance identified by
100 #     instance_name.
101 #
102 #     If instance_name contains a slash, it is presumed to be a path
103 #     (either local or absolute) to a file with Arvados configuration
104 #     settings.
105 #
106 #     Otherwise, it is presumed to be the name of a file in
107 #     $HOME/.config/arvados/instance_name.conf
108 #
109 def api_for_instance(instance_name):
110     if '/' in instance_name:
111         config_file = instance_name
112     else:
113         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
114
115     try:
116         cfg = arvados.config.load(config_file)
117     except (IOError, OSError) as e:
118         abort(("Could not open config file {}: {}\n" +
119                "You must make sure that your configuration tokens\n" +
120                "for Arvados instance {} are in {} and that this\n" +
121                "file is readable.").format(
122                    config_file, e, instance_name, config_file))
123
124     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
125         api_is_insecure = (
126             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
127                 ['1', 't', 'true', 'y', 'yes']))
128         client = arvados.api('v1',
129                              host=cfg['ARVADOS_API_HOST'],
130                              token=cfg['ARVADOS_API_TOKEN'],
131                              insecure=api_is_insecure,
132                              cache=False)
133     else:
134         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
135     return client
136
137 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
138 #
139 #    Copies a pipeline instance identified by pi_uuid from src to dst.
140 #
141 #    If the 'recursive' option evaluates to True:
142 #      1. Copies all input collections
143 #           * For each component in the pipeline, include all collections
144 #             listed as job dependencies for that component)
145 #      2. Copy docker images
146 #      3. Copy git repositories
147 #      4. Copy the pipeline template
148 #
149 #    The only changes made to the copied pipeline instance are:
150 #      1. The original pipeline instance UUID is preserved in
151 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
152 #      2. The pipeline_template_uuid is changed to the new template uuid.
153 #      3. The owner_uuid of the instance is changed to the user who
154 #         copied it.
155 #
156 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
157     # Fetch the pipeline instance record.
158     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
159
160     if recursive:
161         if not dst_git_repo:
162             abort('--dst-git-repo is required when copying a pipeline recursively.')
163         # Copy the pipeline template and save the copied template.
164         if pi.get('pipeline_template_uuid', None):
165             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
166                                         src, dst,
167                                         dst_git_repo,
168                                         recursive=True)
169
170         # Copy input collections, docker images and git repos.
171         pi = copy_collections(pi, src, dst)
172         copy_git_repos(pi, src, dst, dst_git_repo)
173
174         # Update the fields of the pipeline instance with the copied
175         # pipeline template.
176         if pi.get('pipeline_template_uuid', None):
177             pi['pipeline_template_uuid'] = pt['uuid']
178
179     else:
180         # not recursive
181         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
182         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
183
184     # Update the pipeline instance properties, and create the new
185     # instance at dst.
186     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
187     if dst_project:
188         pi['owner_uuid'] = dst_project
189     else:
190         del pi['owner_uuid']
191     del pi['uuid']
192     pi['ensure_unique_name'] = True
193
194     new_pi = dst.pipeline_instances().create(body=pi).execute()
195     return new_pi
196
197 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
198 #
199 #    Copies a pipeline template identified by pt_uuid from src to dst.
200 #
201 #    If the 'recursive' option evaluates to true, also copy any collections,
202 #    docker images and git repositories that this template references.
203 #
204 #    The owner_uuid of the new template is changed to that of the user
205 #    who copied the template.
206 #
207 #    Returns the copied pipeline template object.
208 #
209 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
210     # fetch the pipeline template from the source instance
211     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
212
213     if recursive:
214         if not dst_git_repo:
215             abort('--dst-git-repo is required when copying a pipeline recursively.')
216         # Copy input collections, docker images and git repos.
217         pt = copy_collections(pt, src, dst)
218         copy_git_repos(pt, src, dst, dst_git_repo)
219
220     pt['name'] = pt['name'] + ' copy'
221     pt['ensure_unique_name'] = True
222     del pt['uuid']
223     del pt['owner_uuid']
224
225     return dst.pipeline_templates().create(body=pt).execute()
226
227 # copy_collections(obj, src, dst)
228 #
229 #    Recursively copies all collections referenced by 'obj' from src
230 #    to dst.
231 #
232 #    Returns a copy of obj with any old collection uuids replaced by
233 #    the new ones.
234 #
235 def copy_collections(obj, src, dst):
236     if type(obj) in [str, unicode]:
237         if uuid_type(src, obj) == 'Collection':
238             newc = copy_collection(obj, src, dst)
239             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
240                 return newc['uuid']
241         return obj
242     elif type(obj) == dict:
243         return {v: copy_collections(obj[v], src, dst) for v in obj}
244     elif type(obj) == list:
245         return [copy_collections(v, src, dst) for v in obj]
246     return obj
247
248 # copy_git_repos(p, src, dst, dst_repo)
249 #
250 #    Copies all git repositories referenced by pipeline instance or
251 #    template 'p' from src to dst.
252 #
253 #    Git repository dependencies are identified by:
254 #      * p['components'][c]['repository']
255 #      * p['components'][c]['job']['repository']
256 #    for each component c in the pipeline.
257 #
258 #    The pipeline object is updated in place with the new repository
259 #    names.  The return value is undefined.
260 #
261 def copy_git_repos(p, src, dst, dst_repo):
262     copied = set()
263     for c in p['components']:
264         component = p['components'][c]
265         if 'repository' in component:
266             repo = component['repository']
267             if repo not in copied:
268                 copy_git_repo(repo, src, dst, dst_repo)
269                 copied.add(repo)
270             component['repository'] = dst_repo
271         if 'job' in component and 'repository' in component['job']:
272             repo = component['job']['repository']
273             if repo not in copied:
274                 copy_git_repo(repo, src, dst, dst_repo)
275                 copied.add(repo)
276             component['job']['repository'] = dst_repo
277
278 # copy_collection(obj_uuid, src, dst)
279 #
280 #    Copies the collection identified by obj_uuid from src to dst.
281 #    Returns the collection object created at dst.
282 #
283 #    For this application, it is critical to preserve the
284 #    collection's manifest hash, which is not guaranteed with the
285 #    arvados.CollectionReader and arvados.CollectionWriter classes.
286 #    Copying each block in the collection manually, followed by
287 #    the manifest block, ensures that the collection's manifest
288 #    hash will not change.
289 #
290 def copy_collection(obj_uuid, src, dst):
291     c = src.collections().get(uuid=obj_uuid).execute()
292
293     # Check whether a collection with this hash already exists
294     # at the destination.  If so, just return that collection.
295     if 'portable_data_hash' in c:
296         colhash = c['portable_data_hash']
297     else:
298         colhash = c['uuid']
299     dstcol = dst.collections().list(
300         filters=[['portable_data_hash', '=', colhash]]
301     ).execute()
302     if dstcol['items_available'] > 0:
303         return dstcol['items'][0]
304
305     # Fetch the collection's manifest.
306     manifest = c['manifest_text']
307     logging.debug('copying collection %s', obj_uuid)
308     logging.debug('manifest_text = %s', manifest)
309
310     # Enumerate the block locators found in the manifest.
311     collection_blocks = sets.Set()
312     src_keep = arvados.keep.KeepClient(src)
313     for line in manifest.splitlines():
314         try:
315             block_hash = line.split()[1]
316             collection_blocks.add(block_hash)
317         except ValueError:
318             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
319
320     # Copy each block from src_keep to dst_keep.
321     dst_keep = arvados.keep.KeepClient(dst)
322     for locator in collection_blocks:
323         data = src_keep.get(locator)
324         logger.debug('copying block %s', locator)
325         logger.info("Retrieved %d bytes", len(data))
326         dst_keep.put(data)
327
328     # Copy the manifest and save the collection.
329     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
330     dst_keep.put(manifest)
331
332     if 'uuid' in c:
333         del c['uuid']
334     if 'owner_uuid' in c:
335         del c['owner_uuid']
336     c['ensure_unique_name'] = True
337     return dst.collections().create(body=c).execute()
338
339 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
340 #
341 #    Copies commits from git repository 'src_git_repo' on Arvados
342 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
343 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
344 #    or "jsmith")
345 #
346 #    All commits will be copied to a destination branch named for the
347 #    source repository URL.
348 #
349 #    Because users cannot create their own repositories, the
350 #    destination repository must already exist.
351 #
352 #    The user running this command must be authenticated
353 #    to both repositories.
354 #
355 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
356     # Identify the fetch and push URLs for the git repositories.
357     r = src.repositories().list(
358         filters=[['name', '=', src_git_repo]]).execute()
359     if r['items_available'] != 1:
360         raise Exception('cannot identify source repo {}; {} repos found'
361                         .format(src_git_repo, r['items_available']))
362     src_git_url = r['items'][0]['fetch_url']
363     logger.debug('src_git_url: {}'.format(src_git_url))
364
365     r = dst.repositories().list(
366         filters=[['name', '=', dst_git_repo]]).execute()
367     if r['items_available'] != 1:
368         raise Exception('cannot identify destination repo {}; {} repos found'
369                         .format(dst_git_repo, r['items_available']))
370     dst_git_push_url  = r['items'][0]['push_url']
371     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
372
373     tmprepo = tempfile.mkdtemp()
374
375     dst_branch = re.sub(r'\W+', '_', src_git_url)
376     arvados.util.run_command(
377         ["git", "clone", src_git_url, tmprepo],
378         cwd=os.path.dirname(tmprepo))
379     arvados.util.run_command(
380         ["git", "checkout", "-b", dst_branch],
381         cwd=tmprepo)
382     arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
383     arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
384
385 # uuid_type(api, object_uuid)
386 #
387 #    Returns the name of the class that object_uuid belongs to, based on
388 #    the second field of the uuid.  This function consults the api's
389 #    schema to identify the object class.
390 #
391 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
392 #
393 #    Special case: if handed a Keep locator hash, return 'Collection'.
394 #
395 def uuid_type(api, object_uuid):
396     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
397         return 'Collection'
398     p = object_uuid.split('-')
399     if len(p) == 3:
400         type_prefix = p[1]
401         for k in api._schema.schemas:
402             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
403             if type_prefix == obj_class:
404                 return k
405     return None
406
407 def abort(msg, code=1):
408     print >>sys.stderr, "arv-copy:", msg
409     exit(code)
410
411 if __name__ == '__main__':
412     main()