1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have configuration files {src}.conf and
17 # {dst}.conf in a standard configuration directory with valid login credentials
18 # for instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
45 from arvados._version import __version__
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
49 logger = logging.getLogger('arvados.arv-copy')
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
66 # The owner_uuid of the object being copied
70 copy_opts = argparse.ArgumentParser(add_help=False)
72 copy_opts.add_argument(
73 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74 help='Print version and exit.')
75 copy_opts.add_argument(
76 '-v', '--verbose', dest='verbose', action='store_true',
77 help='Verbose output.')
78 copy_opts.add_argument(
79 '--progress', dest='progress', action='store_true',
80 help='Report progress on copying collections. (default)')
81 copy_opts.add_argument(
82 '--no-progress', dest='progress', action='store_false',
83 help='Do not report progress on copying collections.')
84 copy_opts.add_argument(
85 '-f', '--force', dest='force', action='store_true',
86 help='Perform copy even if the object appears to exist at the remote destination.')
87 copy_opts.add_argument(
88 '--src', dest='source_arvados',
90 Client configuration location for the source Arvados cluster.
91 May be either a configuration file path, or a plain identifier like `foo`
92 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
93 If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
96 copy_opts.add_argument(
97 '--dst', dest='destination_arvados',
99 Client configuration location for the destination Arvados cluster.
100 May be either a configuration file path, or a plain identifier like `foo`
101 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
102 If not provided, will use the default client configuration from the environment or `settings.conf`.
105 copy_opts.add_argument(
106 '--recursive', dest='recursive', action='store_true',
107 help='Recursively copy any dependencies for this object, and subprojects. (default)')
108 copy_opts.add_argument(
109 '--no-recursive', dest='recursive', action='store_false',
110 help='Do not copy any dependencies or subprojects.')
111 copy_opts.add_argument(
112 '--project-uuid', dest='project_uuid',
113 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
114 copy_opts.add_argument(
115 '--storage-classes', dest='storage_classes',
116 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
117 copy_opts.add_argument("--varying-url-params", type=str, default="",
118 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
120 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
121 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
123 copy_opts.add_argument(
125 help='The UUID of the object to be copied.')
126 copy_opts.set_defaults(progress=True)
127 copy_opts.set_defaults(recursive=True)
129 parser = argparse.ArgumentParser(
130 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
131 parents=[copy_opts, arv_cmd.retry_opt])
132 args = parser.parse_args()
134 if args.storage_classes:
135 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
138 logger.setLevel(logging.DEBUG)
140 logger.setLevel(logging.INFO)
142 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
143 args.source_arvados = args.object_uuid[:5]
145 # Create API clients for the source and destination instances
146 src_arv = api_for_instance(args.source_arvados, args.retries)
147 dst_arv = api_for_instance(args.destination_arvados, args.retries)
149 if not args.project_uuid:
150 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
152 # Identify the kind of object we have been given, and begin copying.
153 t = uuid_type(src_arv, args.object_uuid)
156 if t == 'Collection':
157 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
158 result = copy_collection(args.object_uuid,
161 elif t == 'Workflow':
162 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
163 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
165 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
166 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
168 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
170 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
171 except Exception as e:
172 logger.error("%s", e, exc_info=args.verbose)
175 # Clean up any outstanding temp git repositories.
176 for d in local_repo_dir.values():
177 shutil.rmtree(d, ignore_errors=True)
182 # If no exception was thrown and the response does not have an
183 # error_token field, presume success
184 if result is None or 'error_token' in result or 'uuid' not in result:
186 logger.error("API server returned an error result: {}".format(result))
189 print(result['uuid'])
191 if result.get('partial_error'):
192 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
195 logger.info("Success: created copy with uuid {}".format(result['uuid']))
198 def set_src_owner_uuid(resource, uuid, args):
199 global src_owner_uuid
200 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
201 src_owner_uuid = c.get("owner_uuid")
203 # api_for_instance(instance_name)
205 # Creates an API client for the Arvados instance identified by
208 # If instance_name contains a slash, it is presumed to be a path
209 # (either local or absolute) to a file with Arvados configuration
212 # Otherwise, it is presumed to be the name of a file in a standard
213 # configuration directory.
215 def api_for_instance(instance_name, num_retries):
216 if not instance_name:
218 return arvados.api('v1')
220 if '/' in instance_name:
221 config_file = instance_name
223 dirs = arvados.util._BaseDirectories('CONFIG')
224 config_file = next(dirs.search(f'{instance_name}.conf'), '')
227 cfg = arvados.config.load(config_file)
233 config_file = f'{instance_name}.conf'
234 abort(("Could not {} config file {}: {}\n" +
235 "You must make sure that your configuration tokens\n" +
236 "for Arvados instance {} are in {} and that this\n" +
237 "file is readable.").format(
238 verb, config_file, e.strerror, instance_name, config_file))
240 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
242 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
243 ['1', 't', 'true', 'y', 'yes']))
244 client = arvados.api('v1',
245 host=cfg['ARVADOS_API_HOST'],
246 token=cfg['ARVADOS_API_TOKEN'],
247 insecure=api_is_insecure,
248 num_retries=num_retries,
251 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
254 # Check if git is available
255 def check_git_availability():
258 ['git', '--version'],
260 stdout=subprocess.DEVNULL,
262 except FileNotFoundError:
263 abort('git command is not available. Please ensure git is installed.')
266 def filter_iter(arg):
267 """Iterate a filter string-or-list.
269 Pass in a filter field that can either be a string or list.
270 This will iterate elements as if the field had been written as a list.
272 if isinstance(arg, str):
277 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
278 """Update a single repository filter in-place for the destination.
280 If the filter checks that the repository is src_repository, it is
281 updated to check that the repository is dst_repository. If it does
282 anything else, this function raises ValueError.
284 if src_repository is None:
285 raise ValueError("component does not specify a source repository")
286 elif dst_repository is None:
287 raise ValueError("no destination repository specified to update repository filter")
288 elif repo_filter[1:] == ['=', src_repository]:
289 repo_filter[2] = dst_repository
290 elif repo_filter[1:] == ['in', [src_repository]]:
291 repo_filter[2] = [dst_repository]
293 raise ValueError("repository filter is not a simple source match")
295 def migrate_script_version_filter(version_filter):
296 """Update a single script_version filter in-place for the destination.
298 Currently this function checks that all the filter operands are Git
299 commit hashes. If they're not, it raises ValueError to indicate that
300 the filter is not portable. It could be extended to make other
301 transformations in the future.
303 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
304 raise ValueError("script_version filter is not limited to commit hashes")
306 def attr_filtered(filter_, *attr_names):
307 """Return True if filter_ applies to any of attr_names, else False."""
308 return any((name == 'any') or (name in attr_names)
309 for name in filter_iter(filter_[0]))
311 @contextlib.contextmanager
312 def exception_handler(handler, *exc_types):
313 """If any exc_types are raised in the block, call handler on the exception."""
316 except exc_types as error:
320 # copy_workflow(wf_uuid, src, dst, args)
322 # Copies a workflow identified by wf_uuid from src to dst.
324 # If args.recursive is True, also copy any collections
325 # referenced in the workflow definition yaml.
327 # The owner_uuid of the new workflow is set to any given
328 # project_uuid or the user who copied the template.
330 # Returns the copied workflow object.
332 def copy_workflow(wf_uuid, src, dst, args):
333 # fetch the workflow from the source instance
334 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
336 if not wf["definition"]:
337 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
339 # copy collections and docker images
340 if args.recursive and wf["definition"]:
341 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
342 "ARVADOS_API_TOKEN": src.api_token,
343 "PATH": os.environ["PATH"]}
345 result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
346 capture_output=True, env=env)
347 except FileNotFoundError:
350 no_arv_copy = result.returncode == 2
353 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
354 elif result.returncode != 0:
355 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
357 locations = json.loads(result.stdout)
360 copy_collections(locations, src, dst, args)
362 # copy the workflow itself
364 wf['owner_uuid'] = args.project_uuid
366 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
367 ["name", "=", wf["name"]]]).execute()
368 if len(existing["items"]) == 0:
369 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
371 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
374 def workflow_collections(obj, locations, docker_images):
375 if isinstance(obj, dict):
376 loc = obj.get('location', None)
378 if loc.startswith("keep:"):
379 locations.append(loc[5:])
381 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
382 if docker_image is not None:
383 ds = docker_image.split(":", 1)
384 tag = ds[1] if len(ds)==2 else 'latest'
385 docker_images[ds[0]] = tag
388 workflow_collections(obj[x], locations, docker_images)
389 elif isinstance(obj, list):
391 workflow_collections(x, locations, docker_images)
393 # copy_collections(obj, src, dst, args)
395 # Recursively copies all collections referenced by 'obj' from src
396 # to dst. obj may be a dict or a list, in which case we run
397 # copy_collections on every value it contains. If it is a string,
398 # search it for any substring that matches a collection hash or uuid
399 # (this will find hidden references to collections like
400 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
402 # Returns a copy of obj with any old collection uuids replaced by
405 def copy_collections(obj, src, dst, args):
407 def copy_collection_fn(collection_match):
408 """Helper function for regex substitution: copies a single collection,
409 identified by the collection_match MatchObject, to the
410 destination. Returns the destination collection uuid (or the
411 portable data hash if that's what src_id is).
414 src_id = collection_match.group(0)
415 if src_id not in collections_copied:
416 dst_col = copy_collection(src_id, src, dst, args)
417 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
418 collections_copied[src_id] = src_id
420 collections_copied[src_id] = dst_col['uuid']
421 return collections_copied[src_id]
423 if isinstance(obj, str):
424 # Copy any collections identified in this string to dst, replacing
425 # them with the dst uuids as necessary.
426 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
427 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
429 elif isinstance(obj, dict):
430 return type(obj)((v, copy_collections(obj[v], src, dst, args))
432 elif isinstance(obj, list):
433 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
437 def total_collection_size(manifest_text):
438 """Return the total number of bytes in this collection (excluding
439 duplicate blocks)."""
443 for line in manifest_text.splitlines():
445 for word in words[1:]:
447 loc = arvados.KeepLocator(word)
449 continue # this word isn't a locator, skip it
450 if loc.md5sum not in locators_seen:
451 locators_seen[loc.md5sum] = True
452 total_bytes += loc.size
456 def create_collection_from(c, src, dst, args):
457 """Create a new collection record on dst, and copy Docker metadata if
460 collection_uuid = c['uuid']
462 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
466 body['name'] = "copied from " + collection_uuid
468 if args.storage_classes:
469 body['storage_classes_desired'] = args.storage_classes
471 body['owner_uuid'] = args.project_uuid
473 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
475 # Create docker_image_repo+tag and docker_image_hash links
476 # at the destination.
477 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
478 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
480 for src_link in docker_links:
481 body = {key: src_link[key]
482 for key in ['link_class', 'name', 'properties']}
483 body['head_uuid'] = dst_collection['uuid']
484 body['owner_uuid'] = args.project_uuid
486 lk = dst.links().create(body=body).execute(num_retries=args.retries)
487 logger.debug('created dst link {}'.format(lk))
489 return dst_collection
491 # copy_collection(obj_uuid, src, dst, args)
493 # Copies the collection identified by obj_uuid from src to dst.
494 # Returns the collection object created at dst.
496 # If args.progress is True, produce a human-friendly progress
499 # If a collection with the desired portable_data_hash already
500 # exists at dst, and args.force is False, copy_collection returns
501 # the existing collection without copying any blocks. Otherwise
502 # (if no collection exists or if args.force is True)
503 # copy_collection copies all of the collection data blocks from src
506 # For this application, it is critical to preserve the
507 # collection's manifest hash, which is not guaranteed with the
508 # arvados.CollectionReader and arvados.CollectionWriter classes.
509 # Copying each block in the collection manually, followed by
510 # the manifest block, ensures that the collection's manifest
511 # hash will not change.
513 def copy_collection(obj_uuid, src, dst, args):
514 if arvados.util.keep_locator_pattern.match(obj_uuid):
515 # If the obj_uuid is a portable data hash, it might not be
516 # uniquely identified with a particular collection. As a
517 # result, it is ambiguous as to what name to use for the copy.
518 # Apply some heuristics to pick which collection to get the
520 srccol = src.collections().list(
521 filters=[['portable_data_hash', '=', obj_uuid]],
522 order="created_at asc"
523 ).execute(num_retries=args.retries)
525 items = srccol.get("items")
528 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
534 # There's only one collection with the PDH, so use that.
537 # See if there is a collection that's in the same project
538 # as the root item (usually a workflow) being copied.
540 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
544 # Didn't find any collections located in the same project, so
545 # pick the oldest collection that has a name assigned to it.
551 # None of the collections have names (?!), so just pick the
555 # list() doesn't return manifest text (and we don't want it to,
556 # because we don't need the same maninfest text sent to us 50
557 # times) so go and retrieve the collection object directly
558 # which will include the manifest text.
559 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
561 # Assume this is an actual collection uuid, so fetch it directly.
562 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
564 # If a collection with this hash already exists at the
565 # destination, and 'force' is not true, just return that
568 if 'portable_data_hash' in c:
569 colhash = c['portable_data_hash']
572 dstcol = dst.collections().list(
573 filters=[['portable_data_hash', '=', colhash]]
574 ).execute(num_retries=args.retries)
575 if dstcol['items_available'] > 0:
576 for d in dstcol['items']:
577 if ((args.project_uuid == d['owner_uuid']) and
578 (c.get('name') == d['name']) and
579 (c['portable_data_hash'] == d['portable_data_hash'])):
581 c['manifest_text'] = dst.collections().get(
582 uuid=dstcol['items'][0]['uuid']
583 ).execute(num_retries=args.retries)['manifest_text']
584 return create_collection_from(c, src, dst, args)
586 # Fetch the collection's manifest.
587 manifest = c['manifest_text']
588 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
590 # Copy each block from src_keep to dst_keep.
591 # Use the newly signed locators returned from dst_keep to build
592 # a new manifest as we go.
593 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
594 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
595 dst_manifest = io.StringIO()
598 bytes_expected = total_collection_size(manifest)
600 progress_writer = ProgressWriter(human_progress)
602 progress_writer = None
604 # go through the words
605 # put each block loc into 'get' queue
606 # 'get' threads get block and put it into 'put' queue
607 # 'put' threads put block and then update dst_locators
609 # after going through the whole manifest we go back through it
610 # again and build dst_manifest
612 lock = threading.Lock()
614 # the get queue should be unbounded because we'll add all the
615 # block hashes we want to get, but these are small
616 get_queue = queue.Queue()
620 # the put queue contains full data blocks
621 # and if 'get' is faster than 'put' we could end up consuming
622 # a great deal of RAM if it isn't bounded.
623 put_queue = queue.Queue(threadcount)
628 word = get_queue.get()
631 get_queue.task_done()
634 blockhash = arvados.KeepLocator(word).md5sum
636 if blockhash in dst_locators:
638 get_queue.task_done()
642 logger.debug("Getting block %s", word)
643 data = src_keep.get(word)
644 put_queue.put((word, data))
646 logger.error("Error getting block %s: %s", word, e)
647 transfer_error.append(e)
649 # Drain the 'get' queue so we end early
652 get_queue.task_done()
656 get_queue.task_done()
659 nonlocal bytes_written
661 item = put_queue.get()
663 put_queue.task_done()
667 loc = arvados.KeepLocator(word)
668 blockhash = loc.md5sum
670 if blockhash in dst_locators:
672 put_queue.task_done()
676 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
677 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
679 dst_locators[blockhash] = dst_locator
680 bytes_written += loc.size
682 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
684 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
686 # Drain the 'get' queue so we end early
689 get_queue.task_done()
692 transfer_error.append(e)
694 put_queue.task_done()
696 for line in manifest.splitlines():
698 for word in words[1:]:
700 loc = arvados.KeepLocator(word)
702 # If 'word' can't be parsed as a locator,
703 # presume it's a filename.
708 for i in range(0, threadcount):
711 for i in range(0, threadcount):
712 threading.Thread(target=get_thread, daemon=True).start()
714 for i in range(0, threadcount):
715 threading.Thread(target=put_thread, daemon=True).start()
720 if len(transfer_error) > 0:
721 return {"error_token": "Failed to transfer blocks"}
723 for line in manifest.splitlines():
725 dst_manifest.write(words[0])
726 for word in words[1:]:
728 loc = arvados.KeepLocator(word)
730 # If 'word' can't be parsed as a locator,
731 # presume it's a filename.
732 dst_manifest.write(' ')
733 dst_manifest.write(word)
735 blockhash = loc.md5sum
736 dst_manifest.write(' ')
737 dst_manifest.write(dst_locators[blockhash])
738 dst_manifest.write("\n")
741 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
742 progress_writer.finish()
744 # Copy the manifest and save the collection.
745 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
747 c['manifest_text'] = dst_manifest.getvalue()
748 return create_collection_from(c, src, dst, args)
750 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
751 """Copy the docker image identified by docker_image and
752 docker_image_tag from src to dst. Create appropriate
753 docker_image_repo+tag and docker_image_hash links at dst.
757 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
759 # Find the link identifying this docker image.
760 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
761 src, args.retries, docker_image, docker_image_tag)
762 if docker_image_list:
763 image_uuid, image_info = docker_image_list[0]
764 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
766 # Copy the collection it refers to.
767 dst_image_col = copy_collection(image_uuid, src, dst, args)
768 elif arvados.util.keep_locator_pattern.match(docker_image):
769 dst_image_col = copy_collection(docker_image, src, dst, args)
771 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
773 def copy_project(obj_uuid, src, dst, owner_uuid, args):
775 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
777 # Create/update the destination project
778 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
779 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
780 if len(existing["items"]) == 0:
781 project_record = dst.groups().create(body={"group": {"group_class": "project",
782 "owner_uuid": owner_uuid,
783 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
785 project_record = existing["items"][0]
787 dst.groups().update(uuid=project_record["uuid"],
789 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
791 args.project_uuid = project_record["uuid"]
793 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
800 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
802 except Exception as e:
803 partial_error += "\n" + str(e)
806 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
808 copy_workflow(w["uuid"], src, dst, args)
809 except Exception as e:
810 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
813 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
815 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
816 except Exception as e:
817 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
819 project_record["partial_error"] = partial_error
821 return project_record
823 # git_rev_parse(rev, repo)
825 # Returns the 40-character commit hash corresponding to 'rev' in
826 # git repository 'repo' (which must be the path of a local git
829 def git_rev_parse(rev, repo):
830 proc = subprocess.run(
831 ['git', 'rev-parse', rev],
834 stdout=subprocess.PIPE,
837 return proc.stdout.read().strip()
839 # uuid_type(api, object_uuid)
841 # Returns the name of the class that object_uuid belongs to, based on
842 # the second field of the uuid. This function consults the api's
843 # schema to identify the object class.
845 # It returns a string such as 'Collection', 'Workflow', etc.
847 # Special case: if handed a Keep locator hash, return 'Collection'.
849 def uuid_type(api, object_uuid):
850 if re.match(arvados.util.keep_locator_pattern, object_uuid):
853 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
856 p = object_uuid.split('-')
859 for k in api._schema.schemas:
860 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
861 if type_prefix == obj_class:
866 def copy_from_http(url, src, dst, args):
868 project_uuid = args.project_uuid
869 varying_url_params = args.varying_url_params
870 prefer_cached_downloads = args.prefer_cached_downloads
872 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
873 varying_url_params=varying_url_params,
874 prefer_cached_downloads=prefer_cached_downloads)
875 if cached[2] is not None:
876 return copy_collection(cached[2], src, dst, args)
878 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
879 varying_url_params=varying_url_params,
880 prefer_cached_downloads=prefer_cached_downloads)
882 if cached is not None:
883 return {"uuid": cached[2]}
886 def abort(msg, code=1):
887 logger.info("arv-copy: %s", msg)
891 # Code for reporting on the progress of a collection upload.
892 # Stolen from arvados.commands.put.ArvPutCollectionWriter
893 # TODO(twp): figure out how to refactor into a shared library
894 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
897 def machine_progress(obj_uuid, bytes_written, bytes_expected):
898 return "{} {}: {} {} written {} total\n".format(
903 -1 if (bytes_expected is None) else bytes_expected)
905 def human_progress(obj_uuid, bytes_written, bytes_expected):
907 return "\r{}: {}M / {}M {:.1%} ".format(
909 bytes_written >> 20, bytes_expected >> 20,
910 float(bytes_written) / bytes_expected)
912 return "\r{}: {} ".format(obj_uuid, bytes_written)
914 class ProgressWriter(object):
915 _progress_func = None
918 def __init__(self, progress_func):
919 self._progress_func = progress_func
921 def report(self, obj_uuid, bytes_written, bytes_expected):
922 if self._progress_func is not None:
924 self._progress_func(obj_uuid, bytes_written, bytes_expected))
927 self.outfile.write("\n")
929 if __name__ == '__main__':