3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
32 logger = logging.getLogger('arvados.arv-copy')
35 logger.setLevel(logging.DEBUG)
37 parser = argparse.ArgumentParser(
38 description='Copy a pipeline instance from one Arvados instance to another.')
41 '--recursive', dest='recursive', action='store_true',
42 help='Recursively copy any dependencies for this object. (default)')
44 '--no-recursive', dest='recursive', action='store_false',
45 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
47 '--dst-git-repo', dest='dst_git_repo',
48 help='The name of the destination git repository.')
50 '--project_uuid', dest='project_uuid',
51 help='The UUID of the project at the destination to which the pipeline should be copied.')
54 help='The UUID of the object to be copied.')
57 help='The name of the source Arvados instance.')
59 'destination_arvados',
60 help='The name of the destination Arvados instance.')
61 parser.set_defaults(recursive=True)
63 args = parser.parse_args()
65 # Create API clients for the source and destination instances
66 src_arv = api_for_instance(args.source_arvados)
67 dst_arv = api_for_instance(args.destination_arvados)
69 # Identify the kind of object we have been given, and begin copying.
70 t = uuid_type(src_arv, args.object_uuid)
72 result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
73 elif t == 'PipelineInstance':
74 result = copy_pipeline_instance(args.object_uuid,
77 dst_project=args.project_uuid,
78 recursive=args.recursive)
79 elif t == 'PipelineTemplate':
80 result = copy_pipeline_template(args.object_uuid,
83 recursive=args.recursive)
85 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
87 # If no exception was thrown and the response does not have an
88 # error_token field, presume success
89 if 'error_token' in result or 'uuid' not in result:
90 logger.error("API server returned an error result: {}".format(result))
94 logger.info("Success: created copy with uuid {}".format(result['uuid']))
97 # api_for_instance(instance_name)
99 # Creates an API client for the Arvados instance identified by
100 # instance_name. Credentials must be stored in
101 # $HOME/.config/arvados/instance_name.conf
103 def api_for_instance(instance_name):
104 if '/' in instance_name:
105 abort('illegal instance name {}'.format(instance_name))
106 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
107 cfg = arvados.config.load(config_file)
109 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
111 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
112 ['1', 't', 'true', 'y', 'yes']))
113 client = arvados.api('v1',
114 host=cfg['ARVADOS_API_HOST'],
115 token=cfg['ARVADOS_API_TOKEN'],
116 insecure=api_is_insecure,
119 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
122 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
124 # Copies a pipeline instance identified by pi_uuid from src to dst.
126 # If the 'recursive' option evaluates to True:
127 # 1. Copies all input collections
128 # * For each component in the pipeline, include all collections
129 # listed as job dependencies for that component)
130 # 2. Copy docker images
131 # 3. Copy git repositories
132 # 4. Copy the pipeline template
134 # The only changes made to the copied pipeline instance are:
135 # 1. The original pipeline instance UUID is preserved in
136 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
137 # 2. The pipeline_template_uuid is changed to the new template uuid.
138 # 3. The owner_uuid of the instance is changed to the user who
141 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
142 # Fetch the pipeline instance record.
143 pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
146 # Copy the pipeline template and save the copied template.
147 if pi.get('pipeline_template_uuid', None):
148 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
153 # Copy input collections, docker images and git repos.
154 pi = copy_collections(pi, src, dst)
155 copy_git_repos(pi, src, dst, dst_git_repo)
157 # Update the fields of the pipeline instance with the copied
159 if pi.get('pipeline_template_uuid', None):
160 pi['pipeline_template_uuid'] = pt['uuid']
164 print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
165 print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
167 # Update the pipeline instance properties, and create the new
169 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
171 pi['owner_uuid'] = dst_project
175 pi['ensure_unique_name'] = True
177 new_pi = dst.pipeline_instances().create(body=pi).execute()
180 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
182 # Copies a pipeline template identified by pt_uuid from src to dst.
184 # If the 'recursive' option evaluates to true, also copy any collections,
185 # docker images and git repositories that this template references.
187 # The owner_uuid of the new template is changed to that of the user
188 # who copied the template.
190 # Returns the copied pipeline template object.
192 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
193 # fetch the pipeline template from the source instance
194 pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
197 # Copy input collections, docker images and git repos.
198 pt = copy_collections(pt, src, dst)
199 copy_git_repos(pt, src, dst, dst_git_repo)
201 pt['name'] = pt['name'] + ' copy'
202 pt['ensure_unique_name'] = True
206 return dst.pipeline_templates().create(body=pt).execute()
208 # copy_collections(obj, src, dst)
210 # Recursively copies all collections referenced by 'obj' from src
213 # Returns a copy of obj with any old collection uuids replaced by
216 def copy_collections(obj, src, dst):
217 if type(obj) in [str, unicode]:
218 if uuid_type(src, obj) == 'Collection':
219 newc = copy_collection(obj, src, dst)
220 if obj != newc['uuid'] and obj != newc['portable_data_hash']:
223 elif type(obj) == dict:
224 return {v: copy_collections(obj[v], src, dst) for v in obj}
225 elif type(obj) == list:
226 return [copy_collections(v, src, dst) for v in obj]
229 # copy_git_repos(p, src, dst, dst_repo)
231 # Copies all git repositories referenced by pipeline instance or
232 # template 'p' from src to dst.
234 # Git repository dependencies are identified by:
235 # * p['components'][c]['repository']
236 # * p['components'][c]['job']['repository']
237 # for each component c in the pipeline.
239 # The pipeline object is updated in place with the new repository
240 # names. The return value is undefined.
242 def copy_git_repos(p, src, dst, dst_repo):
244 for c in p['components']:
245 component = p['components'][c]
246 if 'repository' in component:
247 repo = component['repository']
248 if repo not in copied:
249 copy_git_repo(repo, src, dst, dst_repo)
251 component['repository'] = dst_repo
252 if 'job' in component and 'repository' in component['job']:
253 repo = component['job']['repository']
254 if repo not in copied:
255 copy_git_repo(repo, src, dst, dst_repo)
257 component['job']['repository'] = dst_repo
259 # copy_collection(obj_uuid, src, dst)
261 # Copies the collection identified by obj_uuid from src to dst.
262 # Returns the collection object created at dst.
264 # For this application, it is critical to preserve the
265 # collection's manifest hash, which is not guaranteed with the
266 # arvados.CollectionReader and arvados.CollectionWriter classes.
267 # Copying each block in the collection manually, followed by
268 # the manifest block, ensures that the collection's manifest
269 # hash will not change.
271 def copy_collection(obj_uuid, src, dst):
272 c = src.collections().get(uuid=obj_uuid).execute()
274 # Check whether a collection with this hash already exists
275 # at the destination. If so, just return that collection.
276 if 'portable_data_hash' in c:
277 colhash = c['portable_data_hash']
280 dstcol = dst.collections().list(
281 filters=[['portable_data_hash', '=', colhash]]
283 if dstcol['items_available'] > 0:
284 return dstcol['items'][0]
286 # Fetch the collection's manifest.
287 manifest = c['manifest_text']
288 logging.debug('copying collection %s', obj_uuid)
289 logging.debug('manifest_text = %s', manifest)
291 # Enumerate the block locators found in the manifest.
292 collection_blocks = sets.Set()
293 src_keep = arvados.keep.KeepClient(src)
294 for line in manifest.splitlines():
296 block_hash = line.split()[1]
297 collection_blocks.add(block_hash)
299 abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
301 # Copy each block from src_keep to dst_keep.
302 dst_keep = arvados.keep.KeepClient(dst)
303 for locator in collection_blocks:
304 data = src_keep.get(locator)
305 logger.debug('copying block %s', locator)
306 logger.info("Retrieved %d bytes", len(data))
309 # Copy the manifest and save the collection.
310 logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
311 dst_keep.put(manifest)
312 return dst.collections().create(body={"manifest_text": manifest}).execute()
314 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
316 # Copies commits from git repository 'src_git_repo' on Arvados
317 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
318 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
321 # All commits will be copied to a destination branch named for the
322 # source repository URL.
324 # Because users cannot create their own repositories, the
325 # destination repository must already exist.
327 # The user running this command must be authenticated
328 # to both repositories.
330 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
331 # Identify the fetch and push URLs for the git repositories.
332 r = src.repositories().list(
333 filters=[['name', '=', src_git_repo]]).execute()
334 if r['items_available'] != 1:
335 raise Exception('cannot identify source repo {}; {} repos found'
336 .format(src_git_repo, r['items_available']))
337 src_git_url = r['items'][0]['fetch_url']
338 logger.debug('src_git_url: {}'.format(src_git_url))
340 r = dst.repositories().list(
341 filters=[['name', '=', dst_git_repo]]).execute()
342 if r['items_available'] != 1:
343 raise Exception('cannot identify source repo {}; {} repos found'
344 .format(dst_git_repo, r['items_available']))
345 dst_git_push_url = r['items'][0]['push_url']
346 logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
348 tmprepo = tempfile.mkdtemp()
350 dst_branch = re.sub(r'\W+', '_', src_git_url)
351 arvados.util.run_command(
352 ["git", "clone", src_git_url, tmprepo],
353 cwd=os.path.dirname(tmprepo))
354 arvados.util.run_command(
355 ["git", "checkout", "-b", dst_branch],
357 arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
358 arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
360 # uuid_type(api, object_uuid)
362 # Returns the name of the class that object_uuid belongs to, based on
363 # the second field of the uuid. This function consults the api's
364 # schema to identify the object class.
366 # It returns a string such as 'Collection', 'PipelineInstance', etc.
368 # Special case: if handed a Keep locator hash, return 'Collection'.
370 def uuid_type(api, object_uuid):
371 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
373 p = object_uuid.split('-')
376 for k in api._schema.schemas:
377 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
378 if type_prefix == obj_class:
382 def abort(msg, code=1):
383 print >>sys.stderr, "arv-copy:", msg
386 if __name__ == '__main__':