d8ce4f4cdaa9fc977a935ad14f9878b228c9b5d3
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import sets
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31
32 logger = logging.getLogger('arvados.arv-copy')
33
34 def main():
35     logger.setLevel(logging.DEBUG)
36
37     parser = argparse.ArgumentParser(
38         description='Copy a pipeline instance from one Arvados instance to another.')
39
40     parser.add_argument(
41         '--recursive', dest='recursive', action='store_true',
42         help='Recursively copy any dependencies for this object. (default)')
43     parser.add_argument(
44         '--no-recursive', dest='recursive', action='store_false',
45         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
46     parser.add_argument(
47         '--dst-git-repo', dest='dst_git_repo',
48         help='The name of the destination git repository.')
49     parser.add_argument(
50         '--project_uuid', dest='project_uuid',
51         help='The UUID of the project at the destination to which the pipeline should be copied.')
52     parser.add_argument(
53         'object_uuid',
54         help='The UUID of the object to be copied.')
55     parser.add_argument(
56         'source_arvados',
57         help='The name of the source Arvados instance.')
58     parser.add_argument(
59         'destination_arvados',
60         help='The name of the destination Arvados instance.')
61     parser.set_defaults(recursive=True)
62
63     args = parser.parse_args()
64
65     # Create API clients for the source and destination instances
66     src_arv = api_for_instance(args.source_arvados)
67     dst_arv = api_for_instance(args.destination_arvados)
68
69     # Identify the kind of object we have been given, and begin copying.
70     t = uuid_type(src_arv, args.object_uuid)
71     if t == 'Collection':
72         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
73     elif t == 'PipelineInstance':
74         result = copy_pipeline_instance(args.object_uuid,
75                                         src_arv, dst_arv,
76                                         args.dst_git_repo,
77                                         dst_project=args.project_uuid,
78                                         recursive=args.recursive)
79     elif t == 'PipelineTemplate':
80         result = copy_pipeline_template(args.object_uuid,
81                                         src_arv, dst_arv,
82                                         args.dst_git_repo,
83                                         recursive=args.recursive)
84     else:
85         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
86
87     print result
88     exit(0)
89
90 # api_for_instance(instance_name)
91 #
92 #     Creates an API client for the Arvados instance identified by
93 #     instance_name.  Credentials must be stored in
94 #     $HOME/.config/arvados/instance_name.conf
95 #
96 def api_for_instance(instance_name):
97     if '/' in instance_name:
98         abort('illegal instance name {}'.format(instance_name))
99     config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
100     cfg = arvados.config.load(config_file)
101
102     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
103         api_is_insecure = (
104             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
105                 ['1', 't', 'true', 'y', 'yes']))
106         client = arvados.api('v1',
107                              host=cfg['ARVADOS_API_HOST'],
108                              token=cfg['ARVADOS_API_TOKEN'],
109                              insecure=api_is_insecure,
110                              cache=False)
111     else:
112         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
113     return client
114
115 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
116 #
117 #    Copies a pipeline instance identified by pi_uuid from src to dst.
118 #
119 #    If the 'recursive' option evaluates to True:
120 #      1. Copies all input collections
121 #           * For each component in the pipeline, include all collections
122 #             listed as job dependencies for that component)
123 #      2. Copy docker images
124 #      3. Copy git repositories
125 #      4. Copy the pipeline template
126 #
127 #    The only changes made to the copied pipeline instance are:
128 #      1. The original pipeline instance UUID is preserved in
129 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
130 #      2. The pipeline_template_uuid is changed to the new template uuid.
131 #      3. The owner_uuid of the instance is changed to the user who
132 #         copied it.
133 #
134 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
135     # Fetch the pipeline instance record.
136     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
137
138     if recursive:
139         # Copy the pipeline template and save the copied template.
140         if pi.get('pipeline_template_uuid', None):
141             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
142                                         src, dst,
143                                         dst_git_repo,
144                                         recursive=True)
145
146         # Copy input collections, docker images and git repos.
147         pi = copy_collections(pi, src, dst)
148         copy_git_repos(pi, src, dst, dst_git_repo)
149
150         # Update the fields of the pipeline instance with the copied
151         # pipeline template.
152         if pi.get('pipeline_template_uuid', None):
153             pi['pipeline_template_uuid'] = pt['uuid']
154         if dst_project:
155             pi['owner_uuid'] = dst_project
156         else:
157             del pi['owner_uuid']
158
159     else:
160         # not recursive
161         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
162         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
163
164     # Create the new pipeline instance at the destination Arvados.
165     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
166     del pi['uuid']
167     pi['ensure_unique_name'] = True
168     new_pi = dst.pipeline_instances().create(body=pi).execute()
169     return new_pi
170
171 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
172 #
173 #    Copies a pipeline template identified by pt_uuid from src to dst.
174 #
175 #    If the 'recursive' option evaluates to true, also copy any collections,
176 #    docker images and git repositories that this template references.
177 #
178 #    The owner_uuid of the new template is changed to that of the user
179 #    who copied the template.
180 #
181 #    Returns the copied pipeline template object.
182 #
183 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
184     # fetch the pipeline template from the source instance
185     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
186
187     if recursive:
188         # Copy input collections, docker images and git repos.
189         pt = copy_collections(pt, src, dst)
190         copy_git_repos(pt, src, dst, dst_git_repo)
191
192     pt['name'] = pt['name'] + ' copy'
193     pt['ensure_unique_name'] = True
194     del pt['uuid']
195     del pt['owner_uuid']
196
197     return dst.pipeline_templates().create(body=pt).execute()
198
199 # copy_collections(obj, src, dst)
200 #
201 #    Recursively copies all collections referenced by 'obj' from src
202 #    to dst.
203 #
204 #    Returns a copy of obj with any old collection uuids replaced by
205 #    the new ones.
206 #
207 def copy_collections(obj, src, dst):
208     if type(obj) in [str, unicode]:
209         if uuid_type(src, obj) == 'Collection':
210             newc = copy_collection(obj, src, dst)
211             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
212                 return newc['uuid']
213         return obj
214     elif type(obj) == dict:
215         return {v: copy_collections(obj[v], src, dst) for v in obj}
216     elif type(obj) == list:
217         return [copy_collections(v, src, dst) for v in obj]
218     return obj
219
220 # copy_git_repos(p, src, dst, dst_repo)
221 #
222 #    Copies all git repositories referenced by pipeline instance or
223 #    template 'p' from src to dst.
224 #
225 #    Git repository dependencies are identified by:
226 #      * p['components'][c]['repository']
227 #      * p['components'][c]['job']['repository']
228 #    for each component c in the pipeline.
229 #
230 #    The pipeline object is updated in place with the new repository
231 #    names.  The return value is undefined.
232 #
233 def copy_git_repos(p, src, dst, dst_repo):
234     copied = set()
235     for c in p['components']:
236         component = p['components'][c]
237         if 'repository' in component:
238             repo = component['repository']
239             if repo not in copied:
240                 copy_git_repo(repo, src, dst, dst_repo)
241                 copied.add(repo)
242             component['repository'] = dst_repo
243         if 'job' in component and 'repository' in component['job']:
244             repo = component['job']['repository']
245             if repo not in copied:
246                 copy_git_repo(repo, src, dst, dst_repo)
247                 copied.add(repo)
248             component['job']['repository'] = dst_repo
249
250 # copy_collection(obj_uuid, src, dst)
251 #
252 #    Copies the collection identified by obj_uuid from src to dst.
253 #    Returns the collection object created at dst.
254 #
255 #    For this application, it is critical to preserve the
256 #    collection's manifest hash, which is not guaranteed with the
257 #    arvados.CollectionReader and arvados.CollectionWriter classes.
258 #    Copying each block in the collection manually, followed by
259 #    the manifest block, ensures that the collection's manifest
260 #    hash will not change.
261 #
262 def copy_collection(obj_uuid, src, dst):
263     c = src.collections().get(uuid=obj_uuid).execute()
264
265     # Check whether a collection with this hash already exists
266     # at the destination.  If so, just return that collection.
267     if 'portable_data_hash' in c:
268         colhash = c['portable_data_hash']
269     else:
270         colhash = c['uuid']
271     dstcol = dst.collections().list(
272         filters=[['portable_data_hash', '=', colhash]]
273     ).execute()
274     if dstcol['items_available'] > 0:
275         return dstcol['items'][0]
276
277     # Fetch the collection's manifest.
278     manifest = c['manifest_text']
279     logging.debug('copying collection %s', obj_uuid)
280     logging.debug('manifest_text = %s', manifest)
281
282     # Enumerate the block locators found in the manifest.
283     collection_blocks = sets.Set()
284     src_keep = arvados.keep.KeepClient(src)
285     for line in manifest.splitlines():
286         try:
287             block_hash = line.split()[1]
288             collection_blocks.add(block_hash)
289         except ValueError:
290             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
291
292     # Copy each block from src_keep to dst_keep.
293     dst_keep = arvados.keep.KeepClient(dst)
294     for locator in collection_blocks:
295         data = src_keep.get(locator)
296         logger.debug('copying block %s', locator)
297         logger.info("Retrieved %d bytes", len(data))
298         dst_keep.put(data)
299
300     # Copy the manifest and save the collection.
301     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
302     dst_keep.put(manifest)
303     return dst.collections().create(body={"manifest_text": manifest}).execute()
304
305 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
306 #
307 #    Copies commits from git repository 'src_git_repo' on Arvados
308 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
309 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
310 #    or "jsmith")
311 #
312 #    All commits will be copied to a destination branch named for the
313 #    source repository URL.
314 #
315 #    Because users cannot create their own repositories, the
316 #    destination repository must already exist.
317 #
318 #    The user running this command must be authenticated
319 #    to both repositories.
320 #
321 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
322     # Identify the fetch and push URLs for the git repositories.
323     r = src.repositories().list(
324         filters=[['name', '=', src_git_repo]]).execute()
325     if r['items_available'] != 1:
326         raise Exception('cannot identify source repo {}; {} repos found'
327                         .format(src_git_repo, r['items_available']))
328     src_git_url = r['items'][0]['fetch_url']
329     logger.debug('src_git_url: {}'.format(src_git_url))
330
331     r = dst.repositories().list(
332         filters=[['name', '=', dst_git_repo]]).execute()
333     if r['items_available'] != 1:
334         raise Exception('cannot identify source repo {}; {} repos found'
335                         .format(dst_git_repo, r['items_available']))
336     dst_git_push_url  = r['items'][0]['push_url']
337     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
338
339     # If there is a /scratch partition available, attempt to use it
340     # to check out the git repo (which can be quite large)
341     if os.path.exists('/scratch'):
342         tempfile.tempdir = '/scratch/{}'.format(getpass.getuser())
343         os.mkdir(tempfile.tempdir)
344     tmprepo = tempfile.mkdtemp()
345
346     dst_branch = re.sub(r'\W+', '_', src_git_url)
347     arvados.util.run_command(
348         ["git", "clone", src_git_url, tmprepo],
349         cwd=os.path.dirname(tmprepo))
350     arvados.util.run_command(
351         ["git", "checkout", "-b", dst_branch],
352         cwd=tmprepo)
353     arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
354     arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
355
356 # uuid_type(api, object_uuid)
357 #
358 #    Returns the name of the class that object_uuid belongs to, based on
359 #    the second field of the uuid.  This function consults the api's
360 #    schema to identify the object class.
361 #
362 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
363 #
364 #    Special case: if handed a Keep locator hash, return 'Collection'.
365 #
366 def uuid_type(api, object_uuid):
367     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
368         return 'Collection'
369     p = object_uuid.split('-')
370     if len(p) == 3:
371         type_prefix = p[1]
372         for k in api._schema.schemas:
373             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
374             if type_prefix == obj_class:
375                 return k
376     return None
377
378 def abort(msg, code=1):
379     print >>sys.stderr, "arv-copy:", msg
380     exit(code)
381
382 if __name__ == '__main__':
383     main()