3699: bug fix (return value for copy_git_repo)
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import os
21 import re
22 import sets
23 import sys
24 import logging
25 import tempfile
26
27 import arvados
28 import arvados.config
29 import arvados.keep
30
31 logger = logging.getLogger('arvados.arv-copy')
32
33 def main():
34     logger.setLevel(logging.DEBUG)
35
36     parser = argparse.ArgumentParser(
37         description='Copy a pipeline instance from one Arvados instance to another.')
38
39     parser.add_argument(
40         '--recursive', dest='recursive', action='store_true',
41         help='Recursively copy any dependencies for this object. (default)')
42     parser.add_argument(
43         '--no-recursive', dest='recursive', action='store_false',
44         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
45     parser.add_argument(
46         '--dst-git-repo', dest='dst_git_repo',
47         help='The name of the destination git repository.')
48     parser.add_argument(
49         '--project_uuid', dest='project_uuid',
50         help='The UUID of the project at the destination to which the pipeline should be copied.')
51     parser.add_argument(
52         'object_uuid',
53         help='The UUID of the object to be copied.')
54     parser.add_argument(
55         'source_arvados',
56         help='The name of the source Arvados instance.')
57     parser.add_argument(
58         'destination_arvados',
59         help='The name of the destination Arvados instance.')
60     parser.set_defaults(recursive=True)
61
62     args = parser.parse_args()
63
64     # Create API clients for the source and destination instances
65     src_arv = api_for_instance(args.source_arvados)
66     dst_arv = api_for_instance(args.destination_arvados)
67
68     # Identify the kind of object we have been given, and begin copying.
69     t = uuid_type(src_arv, args.object_uuid)
70     if t == 'Collection':
71         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
72     elif t == 'PipelineInstance':
73         result = copy_pipeline_instance(args.object_uuid,
74                                         dst_git_repo=args.dst_git_repo,
75                                         dst_project=args.project_uuid,
76                                         recursive=args.recursive,
77                                         src=src_arv, dst=dst_arv)
78     elif t == 'PipelineTemplate':
79         result = copy_pipeline_template(args.object_uuid,
80                                         recursive=args.recursive,
81                                         src=src_arv, dst=dst_arv)
82     else:
83         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
84
85     print result
86     exit(0)
87
88 # api_for_instance(instance_name)
89 #
90 #     Creates an API client for the Arvados instance identified by
91 #     instance_name.  Credentials must be stored in
92 #     $HOME/.config/arvados/instance_name.conf
93 #
94 def api_for_instance(instance_name):
95     if '/' in instance_name:
96         abort('illegal instance name {}'.format(instance_name))
97     config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
98     cfg = arvados.config.load(config_file)
99
100     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
101         api_is_insecure = (
102             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
103                 ['1', 't', 'true', 'y', 'yes']))
104         client = arvados.api('v1',
105                              host=cfg['ARVADOS_API_HOST'],
106                              token=cfg['ARVADOS_API_TOKEN'],
107                              insecure=api_is_insecure,
108                              cache=False)
109     else:
110         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
111     return client
112
113 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
114 #
115 #    Copies a pipeline instance identified by pi_uuid from src to dst.
116 #
117 #    If the 'recursive' option evaluates to True:
118 #      1. Copies all input collections
119 #           * For each component in the pipeline, include all collections
120 #             listed as job dependencies for that component)
121 #      2. Copy docker images
122 #      3. Copy git repositories
123 #      4. Copy the pipeline template
124 #
125 #    The only changes made to the copied pipeline instance are:
126 #      1. The original pipeline instance UUID is preserved in
127 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
128 #      2. The pipeline_template_uuid is changed to the new template uuid.
129 #      3. The owner_uuid of the instance is changed to the user who
130 #         copied it.
131 #
132 def copy_pipeline_instance(pi_uuid, dst_git_repo=None, dst_project=None, recursive=True, src=None, dst=None):
133     # Fetch the pipeline instance record.
134     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
135     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
136
137     if recursive:
138         # Copy the pipeline template and save the copied template.
139         pt = copy_pipeline_template(pi['pipeline_template_uuid'],
140                                     recursive=True,
141                                     src=src, dst=dst)
142
143         # Copy input collections, docker images and git repos.
144         pi = copy_collections(pi, src, dst)
145         copy_git_repos(pi, dst_git_repo, src, dst)
146
147         # Update the fields of the pipeline instance with the copied
148         # pipeline template.
149         pi['pipeline_template_uuid'] = pt['uuid']
150         if dst_project:
151             pi['owner_uuid'] = dst_project
152         else:
153             del pi['owner_uuid']
154
155     else:
156         # not recursive
157         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
158         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
159
160     # Create the new pipeline instance at the destination Arvados.
161     new_pi = dst.pipeline_instances().create(pipeline_instance=pi).execute()
162     return new_pi
163
164 # copy_pipeline_template(pt_uuid, recursive, src, dst)
165 #
166 #    Copies a pipeline template identified by pt_uuid from src to dst.
167 #
168 #    If the 'recursive' option evaluates to true, also copy any collections,
169 #    docker images and git repositories that this template references.
170 #
171 #    The owner_uuid of the new template is changed to that of the user
172 #    who copied the template.
173 #
174 #    Returns the copied pipeline template object.
175 #
176 def copy_pipeline_template(pt_uuid, recursive=True, src=None, dst=None):
177     # fetch the pipeline template from the source instance
178     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
179
180     if recursive:
181         # Copy input collections, docker images and git repos.
182         pt = copy_collections(pt, src, dst)
183         copy_git_repos(pt, dst_git_repo, src, dst)
184
185     pt['name'] = pt['name'] + ' copy'
186     del pt['uuid']
187     del pt['owner_uuid']
188
189     return dst.pipeline_templates().create(body=pt).execute()
190
191 # copy_collections(obj, src, dst)
192 #
193 #    Recursively copies all collections referenced by 'obj' from src
194 #    to dst.
195 #
196 #    Returns a copy of obj with any old collection uuids replaced by
197 #    the new ones.
198 #
199 def copy_collections(obj, src, dst):
200     if type(obj) in [str, unicode]:
201         if uuid_type(src, obj) == 'Collection':
202             newc = copy_collection(obj, src, dst)
203             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
204                 return newc['uuid']
205         return obj
206     elif type(obj) == dict:
207         return {v: copy_collections(obj[v], src, dst) for v in obj}
208     elif type(obj) == list:
209         return [copy_collections(v, src, dst) for v in obj]
210     return obj
211
212 # copy_git_repos(p, dst_repo, src, dst)
213 #
214 #    Copies all git repositories referenced by pipeline instance or
215 #    template 'p' from src to dst.
216 #
217 #    Git repository dependencies are identified by:
218 #      * p['components'][c]['repository']
219 #      * p['components'][c]['job']['repository']
220 #    for each component c in the pipeline.
221 #
222 #    The pipeline object is updated in place with the new repository
223 #    names.  The return value is undefined.
224 #
225 def copy_git_repos(p, dst_repo, src=None, dst=None):
226     copied = set()
227     for c in p['components']:
228         component = p['components'][c]
229         if 'repository' in component:
230             repo = component['repository']
231             if repo not in copied:
232                 copy_git_repo(repo, dst_repo, src, dst)
233                 copied.add(repo)
234             component['repository'] = dst_repo
235         if 'job' in component and 'repository' in component['job']:
236             repo = component['job']['repository']
237             if repo not in copied:
238                 copy_git_repo(repo, dst_repo, src, dst)
239                 copied.add(repo)
240             component['job']['repository'] = dst_repo
241
242 # copy_collection(obj_uuid, src, dst)
243 #
244 #    Copies the collection identified by obj_uuid from src to dst.
245 #    Returns the collection object created at dst.
246 #
247 #    For this application, it is critical to preserve the
248 #    collection's manifest hash, which is not guaranteed with the
249 #    arvados.CollectionReader and arvados.CollectionWriter classes.
250 #    Copying each block in the collection manually, followed by
251 #    the manifest block, ensures that the collection's manifest
252 #    hash will not change.
253 #
254 def copy_collection(obj_uuid, src=None, dst=None):
255     c = src.collections().get(uuid=obj_uuid).execute()
256
257     # Check whether a collection with this hash already exists
258     # at the destination.  If so, just return that collection.
259     if 'portable_data_hash' in c:
260         colhash = c['portable_data_hash']
261     else:
262         colhash = c['uuid']
263     dstcol = dst.collections().list(
264         filters=[['portable_data_hash', '=', colhash]]
265     ).execute()
266     if dstcol['items_available'] > 0:
267         return dstcol['items'][0]
268
269     # Fetch the collection's manifest.
270     manifest = c['manifest_text']
271     logging.debug('copying collection %s', obj_uuid)
272     logging.debug('manifest_text = %s', manifest)
273
274     # Enumerate the block locators found in the manifest.
275     collection_blocks = sets.Set()
276     src_keep = arvados.keep.KeepClient(src)
277     for line in manifest.splitlines():
278         try:
279             block_hash = line.split()[1]
280             collection_blocks.add(block_hash)
281         except ValueError:
282             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
283
284     # Copy each block from src_keep to dst_keep.
285     dst_keep = arvados.keep.KeepClient(dst)
286     for locator in collection_blocks:
287         data = src_keep.get(locator)
288         logger.debug('copying block %s', locator)
289         logger.info("Retrieved %d bytes", len(data))
290         dst_keep.put(data)
291
292     # Copy the manifest and save the collection.
293     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
294     dst_keep.put(manifest)
295     return dst.collections().create(body={"manifest_text": manifest}).execute()
296
297 # copy_git_repo(src_git_repo, dst_git_repo, src, dst)
298 #
299 #    Copies commits from git repository 'src_git_repo' on Arvados
300 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
301 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
302 #    or "jsmith")
303 #
304 #    All commits will be copied to a destination branch named for the
305 #    source repository URL.
306 #
307 #    Because users cannot create their own repositories, the
308 #    destination repository must already exist.
309 #
310 #    The user running this command must be authenticated
311 #    to both repositories.
312 #
313 def copy_git_repo(src_git_repo, dst_git_repo, src=None, dst=None):
314     # Identify the fetch and push URLs for the git repositories.
315     r = src.repositories().list(
316         filters=[['name', '=', src_git_repo]]).execute()
317     if r['items_available'] != 1:
318         raise Exception('cannot identify source repo {}; {} repos found'
319                         .format(src_git_repo, r['items_available']))
320     src_git_url = r['items'][0]['fetch_url']
321     logger.debug('src_git_url: {}'.format(src_git_url))
322
323     r = dst.repositories().list(
324         filters=[['name', '=', dst_git_repo]]).execute()
325     if r['items_available'] != 1:
326         raise Exception('cannot identify source repo {}; {} repos found'
327                         .format(dst_git_repo, r['items_available']))
328     dst_git_push_url  = r['items'][0]['push_url']
329     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
330
331     tmprepo = tempfile.mkdtemp()
332
333     dst_branch = re.sub(r'\W+', '_', src_git_url)
334     arvados.util.run_command(
335         ["git", "clone", src_git_url, tmprepo],
336         cwd=os.path.dirname(tmprepo))
337     arvados.util.run_command(
338         ["git", "checkout", "-b", dst_branch],
339         cwd=tmprepo)
340     arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
341     arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
342
343 # uuid_type(api, object_uuid)
344 #
345 #    Returns the name of the class that object_uuid belongs to, based on
346 #    the second field of the uuid.  This function consults the api's
347 #    schema to identify the object class.
348 #
349 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
350 #
351 #    Special case: if handed a Keep locator hash, return 'Collection'.
352 #
353 def uuid_type(api, object_uuid):
354     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
355         return 'Collection'
356     p = object_uuid.split('-')
357     if len(p) == 3:
358         type_prefix = p[1]
359         for k in api._schema.schemas:
360             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
361             if type_prefix == obj_class:
362                 return k
363     return None
364
365 def abort(msg, code=1):
366     print >>sys.stderr, "arv-copy:", msg
367     exit(code)
368
369 if __name__ == '__main__':
370     main()