3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
5 # Copies an object from Arvados instance src to instance dst.
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst. If either of these files is not found,
17 # arv-copy will issue an error.
33 logger = logging.getLogger('arvados.arv-copy')
35 # local_repo_dir records which git repositories from the Arvados source
36 # instance have been checked out locally during this run, and to which
38 # e.g. if repository 'twp' from src_arv has been cloned into
39 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
44 parser = argparse.ArgumentParser(
45 description='Copy a pipeline instance from one Arvados instance to another.')
48 '-v', '--verbose', dest='verbose', action='store_true',
49 help='Verbose output.')
51 '--progress', dest='progress', action='store_true',
52 help='Report progress on copying collections. (default)')
54 '--no-progress', dest='progress', action='store_false',
55 help='Do not report progress on copying collections.')
57 '-f', '--force', dest='force', action='store_true',
58 help='Perform copy even if the object appears to exist at the remote destination.')
60 '--src', dest='source_arvados', required=True,
61 help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
63 '--dst', dest='destination_arvados', required=True,
64 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
66 '--recursive', dest='recursive', action='store_true',
67 help='Recursively copy any dependencies for this object. (default)')
69 '--no-recursive', dest='recursive', action='store_false',
70 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
72 '--dst-git-repo', dest='dst_git_repo',
73 help='The name of the destination git repository. Required when copying a pipeline recursively.')
75 '--project-uuid', dest='project_uuid',
76 help='The UUID of the project at the destination to which the pipeline should be copied.')
79 help='The UUID of the object to be copied.')
80 parser.set_defaults(progress=True)
81 parser.set_defaults(recursive=True)
83 args = parser.parse_args()
86 logger.setLevel(logging.DEBUG)
88 logger.setLevel(logging.INFO)
90 # Create API clients for the source and destination instances
91 src_arv = api_for_instance(args.source_arvados)
92 dst_arv = api_for_instance(args.destination_arvados)
94 # Identify the kind of object we have been given, and begin copying.
95 t = uuid_type(src_arv, args.object_uuid)
97 result = copy_collection(args.object_uuid,
100 elif t == 'PipelineInstance':
101 result = copy_pipeline_instance(args.object_uuid,
104 elif t == 'PipelineTemplate':
105 result = copy_pipeline_template(args.object_uuid,
106 src_arv, dst_arv, args)
108 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
110 # Clean up any outstanding temp git repositories.
111 for d in local_repo_dir.values():
112 shutil.rmtree(d, ignore_errors=True)
114 # If no exception was thrown and the response does not have an
115 # error_token field, presume success
116 if 'error_token' in result or 'uuid' not in result:
117 logger.error("API server returned an error result: {}".format(result))
121 logger.info("Success: created copy with uuid {}".format(result['uuid']))
124 # api_for_instance(instance_name)
126 # Creates an API client for the Arvados instance identified by
129 # If instance_name contains a slash, it is presumed to be a path
130 # (either local or absolute) to a file with Arvados configuration
133 # Otherwise, it is presumed to be the name of a file in
134 # $HOME/.config/arvados/instance_name.conf
136 def api_for_instance(instance_name):
137 if '/' in instance_name:
138 config_file = instance_name
140 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
143 cfg = arvados.config.load(config_file)
144 except (IOError, OSError) as e:
145 abort(("Could not open config file {}: {}\n" +
146 "You must make sure that your configuration tokens\n" +
147 "for Arvados instance {} are in {} and that this\n" +
148 "file is readable.").format(
149 config_file, e, instance_name, config_file))
151 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
153 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
154 ['1', 't', 'true', 'y', 'yes']))
155 client = arvados.api('v1',
156 host=cfg['ARVADOS_API_HOST'],
157 token=cfg['ARVADOS_API_TOKEN'],
158 insecure=api_is_insecure,
161 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
164 # copy_pipeline_instance(pi_uuid, src, dst, args)
166 # Copies a pipeline instance identified by pi_uuid from src to dst.
168 # If the args.recursive option is set:
169 # 1. Copies all input collections
170 # * For each component in the pipeline, include all collections
171 # listed as job dependencies for that component)
172 # 2. Copy docker images
173 # 3. Copy git repositories
174 # 4. Copy the pipeline template
176 # The only changes made to the copied pipeline instance are:
177 # 1. The original pipeline instance UUID is preserved in
178 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
179 # 2. The pipeline_template_uuid is changed to the new template uuid.
180 # 3. The owner_uuid of the instance is changed to the user who
183 def copy_pipeline_instance(pi_uuid, src, dst, args):
184 # Fetch the pipeline instance record.
185 pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
188 if not args.dst_git_repo:
189 abort('--dst-git-repo is required when copying a pipeline recursively.')
190 # Copy the pipeline template and save the copied template.
191 if pi.get('pipeline_template_uuid', None):
192 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
195 # Copy input collections, docker images and git repos.
196 pi = copy_collections(pi, src, dst, args)
197 copy_git_repos(pi, src, dst, args.dst_git_repo)
199 # Update the fields of the pipeline instance with the copied
201 if pi.get('pipeline_template_uuid', None):
202 pi['pipeline_template_uuid'] = pt['uuid']
206 print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
207 print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
209 # Update the pipeline instance properties, and create the new
211 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
212 pi['description'] = "Pipeline copied from {}\n\n{}".format(
213 pi_uuid, pi.get('description', ''))
215 pi['owner_uuid'] = dst_project
219 pi['ensure_unique_name'] = True
221 new_pi = dst.pipeline_instances().create(body=pi).execute()
224 # copy_pipeline_template(pt_uuid, src, dst, args)
226 # Copies a pipeline template identified by pt_uuid from src to dst.
228 # If args.recursive is True, also copy any collections, docker
229 # images and git repositories that this template references.
231 # The owner_uuid of the new template is changed to that of the user
232 # who copied the template.
234 # Returns the copied pipeline template object.
236 def copy_pipeline_template(pt_uuid, src, dst, args):
237 # fetch the pipeline template from the source instance
238 pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
241 if not args.dst_git_repo:
242 abort('--dst-git-repo is required when copying a pipeline recursively.')
243 # Copy input collections, docker images and git repos.
244 pt = copy_collections(pt, src, dst, args)
245 copy_git_repos(pt, src, dst, args.dst_git_repo)
247 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
248 pt_uuid, pt.get('description', ''))
249 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
250 pt['ensure_unique_name'] = True
254 return dst.pipeline_templates().create(body=pt).execute()
256 # copy_collections(obj, src, dst, args)
258 # Recursively copies all collections referenced by 'obj' from src
261 # Returns a copy of obj with any old collection uuids replaced by
264 def copy_collections(obj, src, dst, args):
265 if type(obj) in [str, unicode]:
266 if uuid_type(src, obj) == 'Collection':
267 newc = copy_collection(obj, src, dst, args)
268 if obj != newc['uuid'] and obj != newc['portable_data_hash']:
271 elif type(obj) == dict:
272 return {v: copy_collections(obj[v], src, dst, args) for v in obj}
273 elif type(obj) == list:
274 return [copy_collections(v, src, dst, args) for v in obj]
277 # copy_git_repos(p, src, dst, dst_repo)
279 # Copies all git repositories referenced by pipeline instance or
280 # template 'p' from src to dst.
282 # For each component c in the pipeline:
283 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present
284 # * Rename script versions:
285 # * c['script_version']
286 # * c['job']['script_version']
287 # * c['job']['supplied_script_version']
288 # to the commit hashes they resolve to, since any symbolic
289 # names (tags, branches) are not preserved in the destination repo.
291 # The pipeline object is updated in place with the new repository
292 # names. The return value is undefined.
294 def copy_git_repos(p, src, dst, dst_repo):
296 for c in p['components']:
297 component = p['components'][c]
298 if 'repository' in component:
299 repo = component['repository']
300 if repo not in copied:
301 copy_git_repo(repo, src, dst, dst_repo)
303 component['repository'] = dst_repo
304 if 'script_version' in component:
305 repo_dir = local_repo_dir[repo]
306 component['script_version'] = git_rev_parse(component['script_version'], repo_dir)
307 if 'job' in component:
309 if 'repository' in j:
310 repo = j['repository']
311 if repo not in copied:
312 copy_git_repo(repo, src, dst, dst_repo)
314 j['repository'] = dst_repo
315 repo_dir = local_repo_dir[repo]
316 if 'script_version' in j:
317 j['script_version'] = git_rev_parse(j['script_version'], repo_dir)
318 if 'supplied_script_version' in j:
319 j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
321 def total_collection_size(manifest_text):
322 """Return the total number of bytes in this collection (excluding
323 duplicate blocks)."""
327 for line in manifest_text.splitlines():
329 for word in words[1:]:
331 loc = arvados.KeepLocator(word)
333 continue # this word isn't a locator, skip it
334 if loc.md5sum not in locators_seen:
335 locators_seen[loc.md5sum] = True
336 total_bytes += loc.size
340 # copy_collection(obj_uuid, src, dst, args)
342 # Copies the collection identified by obj_uuid from src to dst.
343 # Returns the collection object created at dst.
345 # If args.progress is True, produce a human-friendly progress
348 # If a collection with the desired portable_data_hash already
349 # exists at dst, and args.force is False, copy_collection returns
350 # the existing collection without copying any blocks. Otherwise
351 # (if no collection exists or if args.force is True)
352 # copy_collection copies all of the collection data blocks from src
355 # For this application, it is critical to preserve the
356 # collection's manifest hash, which is not guaranteed with the
357 # arvados.CollectionReader and arvados.CollectionWriter classes.
358 # Copying each block in the collection manually, followed by
359 # the manifest block, ensures that the collection's manifest
360 # hash will not change.
362 def copy_collection(obj_uuid, src, dst, args):
363 c = src.collections().get(uuid=obj_uuid).execute()
365 # If a collection with this hash already exists at the
366 # destination, and 'force' is not true, just return that
369 if 'portable_data_hash' in c:
370 colhash = c['portable_data_hash']
373 dstcol = dst.collections().list(
374 filters=[['portable_data_hash', '=', colhash]]
376 if dstcol['items_available'] > 0:
377 logger.debug("Skipping collection %s (already at dst)", obj_uuid)
378 return dstcol['items'][0]
380 logger.debug("Copying collection %s", obj_uuid)
382 # Fetch the collection's manifest.
383 manifest = c['manifest_text']
385 # Copy each block from src_keep to dst_keep.
386 # Use the newly signed locators returned from dst_keep to build
387 # a new manifest as we go.
388 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
389 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
393 bytes_expected = total_collection_size(manifest)
395 progress_writer = ProgressWriter(human_progress)
397 progress_writer = None
399 for line in manifest.splitlines():
401 dst_manifest_line = words[0]
402 for word in words[1:]:
404 loc = arvados.KeepLocator(word)
405 blockhash = loc.md5sum
406 # copy this block if we haven't seen it before
407 # (otherwise, just reuse the existing dst_locator)
408 if blockhash not in dst_locators:
409 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
411 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
412 data = src_keep.get(word)
413 dst_locator = dst_keep.put(data)
414 dst_locators[blockhash] = dst_locator
415 bytes_written += loc.size
416 dst_manifest_line += ' ' + dst_locators[blockhash]
418 # If 'word' can't be parsed as a locator,
419 # presume it's a filename.
420 dst_manifest_line += ' ' + word
421 dst_manifest += dst_manifest_line + "\n"
424 progress_writer.finish()
426 # Copy the manifest and save the collection.
427 logger.debug('saving {} manifest: {}'.format(obj_uuid, dst_manifest))
428 dst_keep.put(dst_manifest)
432 if 'owner_uuid' in c:
434 c['ensure_unique_name'] = True
435 c['manifest_text'] = dst_manifest
436 return dst.collections().create(body=c).execute()
438 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
440 # Copies commits from git repository 'src_git_repo' on Arvados
441 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo
442 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
445 # All commits will be copied to a destination branch named for the
446 # source repository URL.
448 # Because users cannot create their own repositories, the
449 # destination repository must already exist.
451 # The user running this command must be authenticated
452 # to both repositories.
454 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
455 # Identify the fetch and push URLs for the git repositories.
456 r = src.repositories().list(
457 filters=[['name', '=', src_git_repo]]).execute()
458 if r['items_available'] != 1:
459 raise Exception('cannot identify source repo {}; {} repos found'
460 .format(src_git_repo, r['items_available']))
461 src_git_url = r['items'][0]['fetch_url']
462 logger.debug('src_git_url: {}'.format(src_git_url))
464 r = dst.repositories().list(
465 filters=[['name', '=', dst_git_repo]]).execute()
466 if r['items_available'] != 1:
467 raise Exception('cannot identify destination repo {}; {} repos found'
468 .format(dst_git_repo, r['items_available']))
469 dst_git_push_url = r['items'][0]['push_url']
470 logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
472 dst_branch = re.sub(r'\W+', '_', src_git_url)
474 # Copy git commits from src repo to dst repo (but only if
475 # we have not already copied this repo in this session).
477 if src_git_repo in local_repo_dir:
478 logger.debug('already copied src repo %s, skipping', src_git_repo)
480 tmprepo = tempfile.mkdtemp()
481 local_repo_dir[src_git_repo] = tmprepo
482 arvados.util.run_command(
483 ["git", "clone", src_git_url, tmprepo],
484 cwd=os.path.dirname(tmprepo))
485 arvados.util.run_command(
486 ["git", "checkout", "-b", dst_branch],
488 arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
489 arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
491 # git_rev_parse(rev, repo)
493 # Returns the 40-character commit hash corresponding to 'rev' in
494 # git repository 'repo' (which must be the path of a local git
497 def git_rev_parse(rev, repo):
498 gitout, giterr = arvados.util.run_command(
499 ['git', 'rev-parse', rev], cwd=repo)
500 return gitout.strip()
502 # uuid_type(api, object_uuid)
504 # Returns the name of the class that object_uuid belongs to, based on
505 # the second field of the uuid. This function consults the api's
506 # schema to identify the object class.
508 # It returns a string such as 'Collection', 'PipelineInstance', etc.
510 # Special case: if handed a Keep locator hash, return 'Collection'.
512 def uuid_type(api, object_uuid):
513 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
515 p = object_uuid.split('-')
518 for k in api._schema.schemas:
519 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
520 if type_prefix == obj_class:
524 def abort(msg, code=1):
525 print >>sys.stderr, "arv-copy:", msg
529 # Code for reporting on the progress of a collection upload.
530 # Stolen from arvados.commands.put.ArvPutCollectionWriter
531 # TODO(twp): figure out how to refactor into a shared library
532 # (may involve refactoring some arvados.commands.copy.copy_collection
535 def machine_progress(obj_uuid, bytes_written, bytes_expected):
536 return "{} {}: {} {} written {} total\n".format(
541 -1 if (bytes_expected is None) else bytes_expected)
543 def human_progress(obj_uuid, bytes_written, bytes_expected):
545 return "\r{}: {}M / {}M {:.1%} ".format(
547 bytes_written >> 20, bytes_expected >> 20,
548 float(bytes_written) / bytes_expected)
550 return "\r{}: {} ".format(obj_uuid, bytes_written)
552 class ProgressWriter(object):
553 _progress_func = None
556 def __init__(self, progress_func):
557 self._progress_func = progress_func
559 def report(self, obj_uuid, bytes_written, bytes_expected):
560 if self._progress_func is not None:
562 self._progress_func(obj_uuid, bytes_written, bytes_expected))
565 self.outfile.write("\n")
567 if __name__ == '__main__':