3699: report success/failure unambiguously
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import sets
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31
32 logger = logging.getLogger('arvados.arv-copy')
33
34 def main():
35     logger.setLevel(logging.DEBUG)
36
37     parser = argparse.ArgumentParser(
38         description='Copy a pipeline instance from one Arvados instance to another.')
39
40     parser.add_argument(
41         '--recursive', dest='recursive', action='store_true',
42         help='Recursively copy any dependencies for this object. (default)')
43     parser.add_argument(
44         '--no-recursive', dest='recursive', action='store_false',
45         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
46     parser.add_argument(
47         '--dst-git-repo', dest='dst_git_repo',
48         help='The name of the destination git repository.')
49     parser.add_argument(
50         '--project_uuid', dest='project_uuid',
51         help='The UUID of the project at the destination to which the pipeline should be copied.')
52     parser.add_argument(
53         'object_uuid',
54         help='The UUID of the object to be copied.')
55     parser.add_argument(
56         'source_arvados',
57         help='The name of the source Arvados instance.')
58     parser.add_argument(
59         'destination_arvados',
60         help='The name of the destination Arvados instance.')
61     parser.set_defaults(recursive=True)
62
63     args = parser.parse_args()
64
65     # Create API clients for the source and destination instances
66     src_arv = api_for_instance(args.source_arvados)
67     dst_arv = api_for_instance(args.destination_arvados)
68
69     # Identify the kind of object we have been given, and begin copying.
70     t = uuid_type(src_arv, args.object_uuid)
71     if t == 'Collection':
72         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
73     elif t == 'PipelineInstance':
74         result = copy_pipeline_instance(args.object_uuid,
75                                         src_arv, dst_arv,
76                                         args.dst_git_repo,
77                                         dst_project=args.project_uuid,
78                                         recursive=args.recursive)
79     elif t == 'PipelineTemplate':
80         result = copy_pipeline_template(args.object_uuid,
81                                         src_arv, dst_arv,
82                                         args.dst_git_repo,
83                                         recursive=args.recursive)
84     else:
85         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
86
87     # If no exception was thrown and the response does not have an
88     # error_token field, presume success
89     if 'error_token' in result or 'uuid' not in result:
90         logger.error("API server returned an error result: {}".format(result))
91         exit(1)
92
93     logger.info("")
94     logger.info("Success: created copy with uuid {}".format(result['uuid']))
95     exit(0)
96
97 # api_for_instance(instance_name)
98 #
99 #     Creates an API client for the Arvados instance identified by
100 #     instance_name.  Credentials must be stored in
101 #     $HOME/.config/arvados/instance_name.conf
102 #
103 def api_for_instance(instance_name):
104     if '/' in instance_name:
105         abort('illegal instance name {}'.format(instance_name))
106     config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
107     cfg = arvados.config.load(config_file)
108
109     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
110         api_is_insecure = (
111             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
112                 ['1', 't', 'true', 'y', 'yes']))
113         client = arvados.api('v1',
114                              host=cfg['ARVADOS_API_HOST'],
115                              token=cfg['ARVADOS_API_TOKEN'],
116                              insecure=api_is_insecure,
117                              cache=False)
118     else:
119         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
120     return client
121
122 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
123 #
124 #    Copies a pipeline instance identified by pi_uuid from src to dst.
125 #
126 #    If the 'recursive' option evaluates to True:
127 #      1. Copies all input collections
128 #           * For each component in the pipeline, include all collections
129 #             listed as job dependencies for that component)
130 #      2. Copy docker images
131 #      3. Copy git repositories
132 #      4. Copy the pipeline template
133 #
134 #    The only changes made to the copied pipeline instance are:
135 #      1. The original pipeline instance UUID is preserved in
136 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
137 #      2. The pipeline_template_uuid is changed to the new template uuid.
138 #      3. The owner_uuid of the instance is changed to the user who
139 #         copied it.
140 #
141 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
142     # Fetch the pipeline instance record.
143     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
144
145     if recursive:
146         # Copy the pipeline template and save the copied template.
147         if pi.get('pipeline_template_uuid', None):
148             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
149                                         src, dst,
150                                         dst_git_repo,
151                                         recursive=True)
152
153         # Copy input collections, docker images and git repos.
154         pi = copy_collections(pi, src, dst)
155         copy_git_repos(pi, src, dst, dst_git_repo)
156
157         # Update the fields of the pipeline instance with the copied
158         # pipeline template.
159         if pi.get('pipeline_template_uuid', None):
160             pi['pipeline_template_uuid'] = pt['uuid']
161
162     else:
163         # not recursive
164         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
165         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
166
167     # Update the pipeline instance properties, and create the new
168     # instance at dst.
169     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
170     if dst_project:
171         pi['owner_uuid'] = dst_project
172     else:
173         del pi['owner_uuid']
174     del pi['uuid']
175     pi['ensure_unique_name'] = True
176
177     new_pi = dst.pipeline_instances().create(body=pi).execute()
178     return new_pi
179
180 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
181 #
182 #    Copies a pipeline template identified by pt_uuid from src to dst.
183 #
184 #    If the 'recursive' option evaluates to true, also copy any collections,
185 #    docker images and git repositories that this template references.
186 #
187 #    The owner_uuid of the new template is changed to that of the user
188 #    who copied the template.
189 #
190 #    Returns the copied pipeline template object.
191 #
192 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
193     # fetch the pipeline template from the source instance
194     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
195
196     if recursive:
197         # Copy input collections, docker images and git repos.
198         pt = copy_collections(pt, src, dst)
199         copy_git_repos(pt, src, dst, dst_git_repo)
200
201     pt['name'] = pt['name'] + ' copy'
202     pt['ensure_unique_name'] = True
203     del pt['uuid']
204     del pt['owner_uuid']
205
206     return dst.pipeline_templates().create(body=pt).execute()
207
208 # copy_collections(obj, src, dst)
209 #
210 #    Recursively copies all collections referenced by 'obj' from src
211 #    to dst.
212 #
213 #    Returns a copy of obj with any old collection uuids replaced by
214 #    the new ones.
215 #
216 def copy_collections(obj, src, dst):
217     if type(obj) in [str, unicode]:
218         if uuid_type(src, obj) == 'Collection':
219             newc = copy_collection(obj, src, dst)
220             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
221                 return newc['uuid']
222         return obj
223     elif type(obj) == dict:
224         return {v: copy_collections(obj[v], src, dst) for v in obj}
225     elif type(obj) == list:
226         return [copy_collections(v, src, dst) for v in obj]
227     return obj
228
229 # copy_git_repos(p, src, dst, dst_repo)
230 #
231 #    Copies all git repositories referenced by pipeline instance or
232 #    template 'p' from src to dst.
233 #
234 #    Git repository dependencies are identified by:
235 #      * p['components'][c]['repository']
236 #      * p['components'][c]['job']['repository']
237 #    for each component c in the pipeline.
238 #
239 #    The pipeline object is updated in place with the new repository
240 #    names.  The return value is undefined.
241 #
242 def copy_git_repos(p, src, dst, dst_repo):
243     copied = set()
244     for c in p['components']:
245         component = p['components'][c]
246         if 'repository' in component:
247             repo = component['repository']
248             if repo not in copied:
249                 copy_git_repo(repo, src, dst, dst_repo)
250                 copied.add(repo)
251             component['repository'] = dst_repo
252         if 'job' in component and 'repository' in component['job']:
253             repo = component['job']['repository']
254             if repo not in copied:
255                 copy_git_repo(repo, src, dst, dst_repo)
256                 copied.add(repo)
257             component['job']['repository'] = dst_repo
258
259 # copy_collection(obj_uuid, src, dst)
260 #
261 #    Copies the collection identified by obj_uuid from src to dst.
262 #    Returns the collection object created at dst.
263 #
264 #    For this application, it is critical to preserve the
265 #    collection's manifest hash, which is not guaranteed with the
266 #    arvados.CollectionReader and arvados.CollectionWriter classes.
267 #    Copying each block in the collection manually, followed by
268 #    the manifest block, ensures that the collection's manifest
269 #    hash will not change.
270 #
271 def copy_collection(obj_uuid, src, dst):
272     c = src.collections().get(uuid=obj_uuid).execute()
273
274     # Check whether a collection with this hash already exists
275     # at the destination.  If so, just return that collection.
276     if 'portable_data_hash' in c:
277         colhash = c['portable_data_hash']
278     else:
279         colhash = c['uuid']
280     dstcol = dst.collections().list(
281         filters=[['portable_data_hash', '=', colhash]]
282     ).execute()
283     if dstcol['items_available'] > 0:
284         return dstcol['items'][0]
285
286     # Fetch the collection's manifest.
287     manifest = c['manifest_text']
288     logging.debug('copying collection %s', obj_uuid)
289     logging.debug('manifest_text = %s', manifest)
290
291     # Enumerate the block locators found in the manifest.
292     collection_blocks = sets.Set()
293     src_keep = arvados.keep.KeepClient(src)
294     for line in manifest.splitlines():
295         try:
296             block_hash = line.split()[1]
297             collection_blocks.add(block_hash)
298         except ValueError:
299             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
300
301     # Copy each block from src_keep to dst_keep.
302     dst_keep = arvados.keep.KeepClient(dst)
303     for locator in collection_blocks:
304         data = src_keep.get(locator)
305         logger.debug('copying block %s', locator)
306         logger.info("Retrieved %d bytes", len(data))
307         dst_keep.put(data)
308
309     # Copy the manifest and save the collection.
310     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
311     dst_keep.put(manifest)
312     return dst.collections().create(body={"manifest_text": manifest}).execute()
313
314 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
315 #
316 #    Copies commits from git repository 'src_git_repo' on Arvados
317 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
318 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
319 #    or "jsmith")
320 #
321 #    All commits will be copied to a destination branch named for the
322 #    source repository URL.
323 #
324 #    Because users cannot create their own repositories, the
325 #    destination repository must already exist.
326 #
327 #    The user running this command must be authenticated
328 #    to both repositories.
329 #
330 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
331     # Identify the fetch and push URLs for the git repositories.
332     r = src.repositories().list(
333         filters=[['name', '=', src_git_repo]]).execute()
334     if r['items_available'] != 1:
335         raise Exception('cannot identify source repo {}; {} repos found'
336                         .format(src_git_repo, r['items_available']))
337     src_git_url = r['items'][0]['fetch_url']
338     logger.debug('src_git_url: {}'.format(src_git_url))
339
340     r = dst.repositories().list(
341         filters=[['name', '=', dst_git_repo]]).execute()
342     if r['items_available'] != 1:
343         raise Exception('cannot identify source repo {}; {} repos found'
344                         .format(dst_git_repo, r['items_available']))
345     dst_git_push_url  = r['items'][0]['push_url']
346     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
347
348     tmprepo = tempfile.mkdtemp()
349
350     dst_branch = re.sub(r'\W+', '_', src_git_url)
351     arvados.util.run_command(
352         ["git", "clone", src_git_url, tmprepo],
353         cwd=os.path.dirname(tmprepo))
354     arvados.util.run_command(
355         ["git", "checkout", "-b", dst_branch],
356         cwd=tmprepo)
357     arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
358     arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
359
360 # uuid_type(api, object_uuid)
361 #
362 #    Returns the name of the class that object_uuid belongs to, based on
363 #    the second field of the uuid.  This function consults the api's
364 #    schema to identify the object class.
365 #
366 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
367 #
368 #    Special case: if handed a Keep locator hash, return 'Collection'.
369 #
370 def uuid_type(api, object_uuid):
371     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
372         return 'Collection'
373     p = object_uuid.split('-')
374     if len(p) == 3:
375         type_prefix = p[1]
376         for k in api._schema.schemas:
377             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
378             if type_prefix == obj_class:
379                 return k
380     return None
381
382 def abort(msg, code=1):
383     print >>sys.stderr, "arv-copy:", msg
384     exit(code)
385
386 if __name__ == '__main__':
387     main()