3699: delete pi['uuid'] before committing
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import os
21 import re
22 import sets
23 import sys
24 import logging
25 import tempfile
26
27 import arvados
28 import arvados.config
29 import arvados.keep
30
31 logger = logging.getLogger('arvados.arv-copy')
32
33 def main():
34     logger.setLevel(logging.DEBUG)
35
36     parser = argparse.ArgumentParser(
37         description='Copy a pipeline instance from one Arvados instance to another.')
38
39     parser.add_argument(
40         '--recursive', dest='recursive', action='store_true',
41         help='Recursively copy any dependencies for this object. (default)')
42     parser.add_argument(
43         '--no-recursive', dest='recursive', action='store_false',
44         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
45     parser.add_argument(
46         '--dst-git-repo', dest='dst_git_repo',
47         help='The name of the destination git repository.')
48     parser.add_argument(
49         '--project_uuid', dest='project_uuid',
50         help='The UUID of the project at the destination to which the pipeline should be copied.')
51     parser.add_argument(
52         'object_uuid',
53         help='The UUID of the object to be copied.')
54     parser.add_argument(
55         'source_arvados',
56         help='The name of the source Arvados instance.')
57     parser.add_argument(
58         'destination_arvados',
59         help='The name of the destination Arvados instance.')
60     parser.set_defaults(recursive=True)
61
62     args = parser.parse_args()
63
64     # Create API clients for the source and destination instances
65     src_arv = api_for_instance(args.source_arvados)
66     dst_arv = api_for_instance(args.destination_arvados)
67
68     # Identify the kind of object we have been given, and begin copying.
69     t = uuid_type(src_arv, args.object_uuid)
70     if t == 'Collection':
71         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
72     elif t == 'PipelineInstance':
73         result = copy_pipeline_instance(args.object_uuid,
74                                         src_arv, dst_arv,
75                                         args.dst_git_repo,
76                                         dst_project=args.project_uuid,
77                                         recursive=args.recursive)
78     elif t == 'PipelineTemplate':
79         result = copy_pipeline_template(args.object_uuid,
80                                         src_arv, dst_arv,
81                                         args.dst_git_repo,
82                                         recursive=args.recursive)
83     else:
84         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
85
86     print result
87     exit(0)
88
89 # api_for_instance(instance_name)
90 #
91 #     Creates an API client for the Arvados instance identified by
92 #     instance_name.  Credentials must be stored in
93 #     $HOME/.config/arvados/instance_name.conf
94 #
95 def api_for_instance(instance_name):
96     if '/' in instance_name:
97         abort('illegal instance name {}'.format(instance_name))
98     config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
99     cfg = arvados.config.load(config_file)
100
101     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
102         api_is_insecure = (
103             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
104                 ['1', 't', 'true', 'y', 'yes']))
105         client = arvados.api('v1',
106                              host=cfg['ARVADOS_API_HOST'],
107                              token=cfg['ARVADOS_API_TOKEN'],
108                              insecure=api_is_insecure,
109                              cache=False)
110     else:
111         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
112     return client
113
114 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
115 #
116 #    Copies a pipeline instance identified by pi_uuid from src to dst.
117 #
118 #    If the 'recursive' option evaluates to True:
119 #      1. Copies all input collections
120 #           * For each component in the pipeline, include all collections
121 #             listed as job dependencies for that component)
122 #      2. Copy docker images
123 #      3. Copy git repositories
124 #      4. Copy the pipeline template
125 #
126 #    The only changes made to the copied pipeline instance are:
127 #      1. The original pipeline instance UUID is preserved in
128 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
129 #      2. The pipeline_template_uuid is changed to the new template uuid.
130 #      3. The owner_uuid of the instance is changed to the user who
131 #         copied it.
132 #
133 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
134     # Fetch the pipeline instance record.
135     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
136
137     if recursive:
138         # Copy the pipeline template and save the copied template.
139         if pi.get('pipeline_template_uuid', None):
140             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
141                                         src, dst,
142                                         dst_git_repo,
143                                         recursive=True)
144
145         # Copy input collections, docker images and git repos.
146         pi = copy_collections(pi, src, dst)
147         copy_git_repos(pi, src, dst, dst_git_repo)
148
149         # Update the fields of the pipeline instance with the copied
150         # pipeline template.
151         if pi.get('pipeline_template_uuid', None):
152             pi['pipeline_template_uuid'] = pt['uuid']
153         if dst_project:
154             pi['owner_uuid'] = dst_project
155         else:
156             del pi['owner_uuid']
157
158     else:
159         # not recursive
160         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
161         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
162
163     # Create the new pipeline instance at the destination Arvados.
164     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
165     del pi['uuid']
166     new_pi = dst.pipeline_instances().create(body=pi).execute()
167     return new_pi
168
169 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
170 #
171 #    Copies a pipeline template identified by pt_uuid from src to dst.
172 #
173 #    If the 'recursive' option evaluates to true, also copy any collections,
174 #    docker images and git repositories that this template references.
175 #
176 #    The owner_uuid of the new template is changed to that of the user
177 #    who copied the template.
178 #
179 #    Returns the copied pipeline template object.
180 #
181 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
182     # fetch the pipeline template from the source instance
183     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
184
185     if recursive:
186         # Copy input collections, docker images and git repos.
187         pt = copy_collections(pt, src, dst)
188         copy_git_repos(pt, src, dst, dst_git_repo)
189
190     pt['name'] = pt['name'] + ' copy'
191     del pt['uuid']
192     del pt['owner_uuid']
193
194     return dst.pipeline_templates().create(body=pt).execute()
195
196 # copy_collections(obj, src, dst)
197 #
198 #    Recursively copies all collections referenced by 'obj' from src
199 #    to dst.
200 #
201 #    Returns a copy of obj with any old collection uuids replaced by
202 #    the new ones.
203 #
204 def copy_collections(obj, src, dst):
205     if type(obj) in [str, unicode]:
206         if uuid_type(src, obj) == 'Collection':
207             newc = copy_collection(obj, src, dst)
208             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
209                 return newc['uuid']
210         return obj
211     elif type(obj) == dict:
212         return {v: copy_collections(obj[v], src, dst) for v in obj}
213     elif type(obj) == list:
214         return [copy_collections(v, src, dst) for v in obj]
215     return obj
216
217 # copy_git_repos(p, src, dst, dst_repo)
218 #
219 #    Copies all git repositories referenced by pipeline instance or
220 #    template 'p' from src to dst.
221 #
222 #    Git repository dependencies are identified by:
223 #      * p['components'][c]['repository']
224 #      * p['components'][c]['job']['repository']
225 #    for each component c in the pipeline.
226 #
227 #    The pipeline object is updated in place with the new repository
228 #    names.  The return value is undefined.
229 #
230 def copy_git_repos(p, src, dst, dst_repo):
231     copied = set()
232     for c in p['components']:
233         component = p['components'][c]
234         if 'repository' in component:
235             repo = component['repository']
236             if repo not in copied:
237                 copy_git_repo(repo, src, dst, dst_repo)
238                 copied.add(repo)
239             component['repository'] = dst_repo
240         if 'job' in component and 'repository' in component['job']:
241             repo = component['job']['repository']
242             if repo not in copied:
243                 copy_git_repo(repo, src, dst, dst_repo)
244                 copied.add(repo)
245             component['job']['repository'] = dst_repo
246
247 # copy_collection(obj_uuid, src, dst)
248 #
249 #    Copies the collection identified by obj_uuid from src to dst.
250 #    Returns the collection object created at dst.
251 #
252 #    For this application, it is critical to preserve the
253 #    collection's manifest hash, which is not guaranteed with the
254 #    arvados.CollectionReader and arvados.CollectionWriter classes.
255 #    Copying each block in the collection manually, followed by
256 #    the manifest block, ensures that the collection's manifest
257 #    hash will not change.
258 #
259 def copy_collection(obj_uuid, src, dst):
260     c = src.collections().get(uuid=obj_uuid).execute()
261
262     # Check whether a collection with this hash already exists
263     # at the destination.  If so, just return that collection.
264     if 'portable_data_hash' in c:
265         colhash = c['portable_data_hash']
266     else:
267         colhash = c['uuid']
268     dstcol = dst.collections().list(
269         filters=[['portable_data_hash', '=', colhash]]
270     ).execute()
271     if dstcol['items_available'] > 0:
272         return dstcol['items'][0]
273
274     # Fetch the collection's manifest.
275     manifest = c['manifest_text']
276     logging.debug('copying collection %s', obj_uuid)
277     logging.debug('manifest_text = %s', manifest)
278
279     # Enumerate the block locators found in the manifest.
280     collection_blocks = sets.Set()
281     src_keep = arvados.keep.KeepClient(src)
282     for line in manifest.splitlines():
283         try:
284             block_hash = line.split()[1]
285             collection_blocks.add(block_hash)
286         except ValueError:
287             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
288
289     # Copy each block from src_keep to dst_keep.
290     dst_keep = arvados.keep.KeepClient(dst)
291     for locator in collection_blocks:
292         data = src_keep.get(locator)
293         logger.debug('copying block %s', locator)
294         logger.info("Retrieved %d bytes", len(data))
295         dst_keep.put(data)
296
297     # Copy the manifest and save the collection.
298     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
299     dst_keep.put(manifest)
300     return dst.collections().create(body={"manifest_text": manifest}).execute()
301
302 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
303 #
304 #    Copies commits from git repository 'src_git_repo' on Arvados
305 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
306 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
307 #    or "jsmith")
308 #
309 #    All commits will be copied to a destination branch named for the
310 #    source repository URL.
311 #
312 #    Because users cannot create their own repositories, the
313 #    destination repository must already exist.
314 #
315 #    The user running this command must be authenticated
316 #    to both repositories.
317 #
318 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
319     # Identify the fetch and push URLs for the git repositories.
320     r = src.repositories().list(
321         filters=[['name', '=', src_git_repo]]).execute()
322     if r['items_available'] != 1:
323         raise Exception('cannot identify source repo {}; {} repos found'
324                         .format(src_git_repo, r['items_available']))
325     src_git_url = r['items'][0]['fetch_url']
326     logger.debug('src_git_url: {}'.format(src_git_url))
327
328     r = dst.repositories().list(
329         filters=[['name', '=', dst_git_repo]]).execute()
330     if r['items_available'] != 1:
331         raise Exception('cannot identify source repo {}; {} repos found'
332                         .format(dst_git_repo, r['items_available']))
333     dst_git_push_url  = r['items'][0]['push_url']
334     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
335
336     tmprepo = tempfile.mkdtemp()
337
338     dst_branch = re.sub(r'\W+', '_', src_git_url)
339     arvados.util.run_command(
340         ["git", "clone", src_git_url, tmprepo],
341         cwd=os.path.dirname(tmprepo))
342     arvados.util.run_command(
343         ["git", "checkout", "-b", dst_branch],
344         cwd=tmprepo)
345     arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
346     arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
347
348 # uuid_type(api, object_uuid)
349 #
350 #    Returns the name of the class that object_uuid belongs to, based on
351 #    the second field of the uuid.  This function consults the api's
352 #    schema to identify the object class.
353 #
354 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
355 #
356 #    Special case: if handed a Keep locator hash, return 'Collection'.
357 #
358 def uuid_type(api, object_uuid):
359     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
360         return 'Collection'
361     p = object_uuid.split('-')
362     if len(p) == 3:
363         type_prefix = p[1]
364         for k in api._schema.schemas:
365             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
366             if type_prefix == obj_class:
367                 return k
368     return None
369
370 def abort(msg, code=1):
371     print >>sys.stderr, "arv-copy:", msg
372     exit(code)
373
374 if __name__ == '__main__':
375     main()