1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have configuration files {src}.conf and
17 # {dst}.conf in a standard configuration directory with valid login credentials
18 # for instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
45 from arvados._version import __version__
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
49 logger = logging.getLogger('arvados.arv-copy')
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
66 # The owner_uuid of the object being copied
70 copy_opts = argparse.ArgumentParser(add_help=False)
72 copy_opts.add_argument(
73 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74 help='Print version and exit.')
75 copy_opts.add_argument(
76 '-v', '--verbose', dest='verbose', action='store_true',
77 help='Verbose output.')
78 copy_opts.add_argument(
79 '--progress', dest='progress', action='store_true',
80 help='Report progress on copying collections. (default)')
81 copy_opts.add_argument(
82 '--no-progress', dest='progress', action='store_false',
83 help='Do not report progress on copying collections.')
84 copy_opts.add_argument(
85 '-f', '--force', dest='force', action='store_true',
86 help='Perform copy even if the object appears to exist at the remote destination.')
87 copy_opts.add_argument(
88 '--src', dest='source_arvados',
89 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for finding "foo.conf" under a systemd or XDG configuration directory. If not provided, will be inferred from the UUID of the object being copied.')
90 copy_opts.add_argument(
91 '--dst', dest='destination_arvados',
92 help='The name of the destination Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for finding "foo.conf" under a systemd or XDG configuration directory. If not provided, will use ARVADOS_API_HOST from environment.')
93 copy_opts.add_argument(
94 '--recursive', dest='recursive', action='store_true',
95 help='Recursively copy any dependencies for this object, and subprojects. (default)')
96 copy_opts.add_argument(
97 '--no-recursive', dest='recursive', action='store_false',
98 help='Do not copy any dependencies or subprojects.')
99 copy_opts.add_argument(
100 '--project-uuid', dest='project_uuid',
101 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
102 copy_opts.add_argument(
103 '--storage-classes', dest='storage_classes',
104 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
105 copy_opts.add_argument("--varying-url-params", type=str, default="",
106 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
108 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
109 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
111 copy_opts.add_argument(
113 help='The UUID of the object to be copied.')
114 copy_opts.set_defaults(progress=True)
115 copy_opts.set_defaults(recursive=True)
117 parser = argparse.ArgumentParser(
118 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
119 parents=[copy_opts, arv_cmd.retry_opt])
120 args = parser.parse_args()
122 if args.storage_classes:
123 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
126 logger.setLevel(logging.DEBUG)
128 logger.setLevel(logging.INFO)
130 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
131 args.source_arvados = args.object_uuid[:5]
133 # Create API clients for the source and destination instances
134 src_arv = api_for_instance(args.source_arvados, args.retries)
135 dst_arv = api_for_instance(args.destination_arvados, args.retries)
137 if not args.project_uuid:
138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
140 # Identify the kind of object we have been given, and begin copying.
141 t = uuid_type(src_arv, args.object_uuid)
144 if t == 'Collection':
145 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
146 result = copy_collection(args.object_uuid,
149 elif t == 'Workflow':
150 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
151 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
153 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
154 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
156 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
158 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
159 except Exception as e:
160 logger.error("%s", e, exc_info=args.verbose)
163 # Clean up any outstanding temp git repositories.
164 for d in local_repo_dir.values():
165 shutil.rmtree(d, ignore_errors=True)
170 # If no exception was thrown and the response does not have an
171 # error_token field, presume success
172 if result is None or 'error_token' in result or 'uuid' not in result:
174 logger.error("API server returned an error result: {}".format(result))
177 print(result['uuid'])
179 if result.get('partial_error'):
180 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
183 logger.info("Success: created copy with uuid {}".format(result['uuid']))
186 def set_src_owner_uuid(resource, uuid, args):
187 global src_owner_uuid
188 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
189 src_owner_uuid = c.get("owner_uuid")
191 # api_for_instance(instance_name)
193 # Creates an API client for the Arvados instance identified by
196 # If instance_name contains a slash, it is presumed to be a path
197 # (either local or absolute) to a file with Arvados configuration
200 # Otherwise, it is presumed to be the name of a file in a standard
201 # configuration directory.
203 def api_for_instance(instance_name, num_retries):
204 if not instance_name:
206 return arvados.api('v1')
208 if '/' in instance_name:
209 config_file = instance_name
211 dirs = arvados.util._BaseDirectories('CONFIG')
212 config_file = next(dirs.search(f'{instance_name}.conf'), '')
215 cfg = arvados.config.load(config_file)
221 config_file = f'{instance_name}.conf'
222 abort(("Could not {} config file {}: {}\n" +
223 "You must make sure that your configuration tokens\n" +
224 "for Arvados instance {} are in {} and that this\n" +
225 "file is readable.").format(
226 verb, config_file, e.strerror, instance_name, config_file))
228 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
230 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
231 ['1', 't', 'true', 'y', 'yes']))
232 client = arvados.api('v1',
233 host=cfg['ARVADOS_API_HOST'],
234 token=cfg['ARVADOS_API_TOKEN'],
235 insecure=api_is_insecure,
236 num_retries=num_retries,
239 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
242 # Check if git is available
243 def check_git_availability():
246 ['git', '--version'],
248 stdout=subprocess.DEVNULL,
250 except FileNotFoundError:
251 abort('git command is not available. Please ensure git is installed.')
254 def filter_iter(arg):
255 """Iterate a filter string-or-list.
257 Pass in a filter field that can either be a string or list.
258 This will iterate elements as if the field had been written as a list.
260 if isinstance(arg, str):
265 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
266 """Update a single repository filter in-place for the destination.
268 If the filter checks that the repository is src_repository, it is
269 updated to check that the repository is dst_repository. If it does
270 anything else, this function raises ValueError.
272 if src_repository is None:
273 raise ValueError("component does not specify a source repository")
274 elif dst_repository is None:
275 raise ValueError("no destination repository specified to update repository filter")
276 elif repo_filter[1:] == ['=', src_repository]:
277 repo_filter[2] = dst_repository
278 elif repo_filter[1:] == ['in', [src_repository]]:
279 repo_filter[2] = [dst_repository]
281 raise ValueError("repository filter is not a simple source match")
283 def migrate_script_version_filter(version_filter):
284 """Update a single script_version filter in-place for the destination.
286 Currently this function checks that all the filter operands are Git
287 commit hashes. If they're not, it raises ValueError to indicate that
288 the filter is not portable. It could be extended to make other
289 transformations in the future.
291 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
292 raise ValueError("script_version filter is not limited to commit hashes")
294 def attr_filtered(filter_, *attr_names):
295 """Return True if filter_ applies to any of attr_names, else False."""
296 return any((name == 'any') or (name in attr_names)
297 for name in filter_iter(filter_[0]))
299 @contextlib.contextmanager
300 def exception_handler(handler, *exc_types):
301 """If any exc_types are raised in the block, call handler on the exception."""
304 except exc_types as error:
308 # copy_workflow(wf_uuid, src, dst, args)
310 # Copies a workflow identified by wf_uuid from src to dst.
312 # If args.recursive is True, also copy any collections
313 # referenced in the workflow definition yaml.
315 # The owner_uuid of the new workflow is set to any given
316 # project_uuid or the user who copied the template.
318 # Returns the copied workflow object.
320 def copy_workflow(wf_uuid, src, dst, args):
321 # fetch the workflow from the source instance
322 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
324 if not wf["definition"]:
325 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
327 # copy collections and docker images
328 if args.recursive and wf["definition"]:
329 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
330 "ARVADOS_API_TOKEN": src.api_token,
331 "PATH": os.environ["PATH"]}
333 result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
334 capture_output=True, env=env)
335 except FileNotFoundError:
338 no_arv_copy = result.returncode == 2
341 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
342 elif result.returncode != 0:
343 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
345 locations = json.loads(result.stdout)
348 copy_collections(locations, src, dst, args)
350 # copy the workflow itself
352 wf['owner_uuid'] = args.project_uuid
354 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
355 ["name", "=", wf["name"]]]).execute()
356 if len(existing["items"]) == 0:
357 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
359 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
362 def workflow_collections(obj, locations, docker_images):
363 if isinstance(obj, dict):
364 loc = obj.get('location', None)
366 if loc.startswith("keep:"):
367 locations.append(loc[5:])
369 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
370 if docker_image is not None:
371 ds = docker_image.split(":", 1)
372 tag = ds[1] if len(ds)==2 else 'latest'
373 docker_images[ds[0]] = tag
376 workflow_collections(obj[x], locations, docker_images)
377 elif isinstance(obj, list):
379 workflow_collections(x, locations, docker_images)
381 # copy_collections(obj, src, dst, args)
383 # Recursively copies all collections referenced by 'obj' from src
384 # to dst. obj may be a dict or a list, in which case we run
385 # copy_collections on every value it contains. If it is a string,
386 # search it for any substring that matches a collection hash or uuid
387 # (this will find hidden references to collections like
388 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
390 # Returns a copy of obj with any old collection uuids replaced by
393 def copy_collections(obj, src, dst, args):
395 def copy_collection_fn(collection_match):
396 """Helper function for regex substitution: copies a single collection,
397 identified by the collection_match MatchObject, to the
398 destination. Returns the destination collection uuid (or the
399 portable data hash if that's what src_id is).
402 src_id = collection_match.group(0)
403 if src_id not in collections_copied:
404 dst_col = copy_collection(src_id, src, dst, args)
405 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
406 collections_copied[src_id] = src_id
408 collections_copied[src_id] = dst_col['uuid']
409 return collections_copied[src_id]
411 if isinstance(obj, str):
412 # Copy any collections identified in this string to dst, replacing
413 # them with the dst uuids as necessary.
414 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
415 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
417 elif isinstance(obj, dict):
418 return type(obj)((v, copy_collections(obj[v], src, dst, args))
420 elif isinstance(obj, list):
421 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
425 def total_collection_size(manifest_text):
426 """Return the total number of bytes in this collection (excluding
427 duplicate blocks)."""
431 for line in manifest_text.splitlines():
433 for word in words[1:]:
435 loc = arvados.KeepLocator(word)
437 continue # this word isn't a locator, skip it
438 if loc.md5sum not in locators_seen:
439 locators_seen[loc.md5sum] = True
440 total_bytes += loc.size
444 def create_collection_from(c, src, dst, args):
445 """Create a new collection record on dst, and copy Docker metadata if
448 collection_uuid = c['uuid']
450 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
454 body['name'] = "copied from " + collection_uuid
456 if args.storage_classes:
457 body['storage_classes_desired'] = args.storage_classes
459 body['owner_uuid'] = args.project_uuid
461 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
463 # Create docker_image_repo+tag and docker_image_hash links
464 # at the destination.
465 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
466 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
468 for src_link in docker_links:
469 body = {key: src_link[key]
470 for key in ['link_class', 'name', 'properties']}
471 body['head_uuid'] = dst_collection['uuid']
472 body['owner_uuid'] = args.project_uuid
474 lk = dst.links().create(body=body).execute(num_retries=args.retries)
475 logger.debug('created dst link {}'.format(lk))
477 return dst_collection
479 # copy_collection(obj_uuid, src, dst, args)
481 # Copies the collection identified by obj_uuid from src to dst.
482 # Returns the collection object created at dst.
484 # If args.progress is True, produce a human-friendly progress
487 # If a collection with the desired portable_data_hash already
488 # exists at dst, and args.force is False, copy_collection returns
489 # the existing collection without copying any blocks. Otherwise
490 # (if no collection exists or if args.force is True)
491 # copy_collection copies all of the collection data blocks from src
494 # For this application, it is critical to preserve the
495 # collection's manifest hash, which is not guaranteed with the
496 # arvados.CollectionReader and arvados.CollectionWriter classes.
497 # Copying each block in the collection manually, followed by
498 # the manifest block, ensures that the collection's manifest
499 # hash will not change.
501 def copy_collection(obj_uuid, src, dst, args):
502 if arvados.util.keep_locator_pattern.match(obj_uuid):
503 # If the obj_uuid is a portable data hash, it might not be
504 # uniquely identified with a particular collection. As a
505 # result, it is ambiguous as to what name to use for the copy.
506 # Apply some heuristics to pick which collection to get the
508 srccol = src.collections().list(
509 filters=[['portable_data_hash', '=', obj_uuid]],
510 order="created_at asc"
511 ).execute(num_retries=args.retries)
513 items = srccol.get("items")
516 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
522 # There's only one collection with the PDH, so use that.
525 # See if there is a collection that's in the same project
526 # as the root item (usually a workflow) being copied.
528 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
532 # Didn't find any collections located in the same project, so
533 # pick the oldest collection that has a name assigned to it.
539 # None of the collections have names (?!), so just pick the
543 # list() doesn't return manifest text (and we don't want it to,
544 # because we don't need the same maninfest text sent to us 50
545 # times) so go and retrieve the collection object directly
546 # which will include the manifest text.
547 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
549 # Assume this is an actual collection uuid, so fetch it directly.
550 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
552 # If a collection with this hash already exists at the
553 # destination, and 'force' is not true, just return that
556 if 'portable_data_hash' in c:
557 colhash = c['portable_data_hash']
560 dstcol = dst.collections().list(
561 filters=[['portable_data_hash', '=', colhash]]
562 ).execute(num_retries=args.retries)
563 if dstcol['items_available'] > 0:
564 for d in dstcol['items']:
565 if ((args.project_uuid == d['owner_uuid']) and
566 (c.get('name') == d['name']) and
567 (c['portable_data_hash'] == d['portable_data_hash'])):
569 c['manifest_text'] = dst.collections().get(
570 uuid=dstcol['items'][0]['uuid']
571 ).execute(num_retries=args.retries)['manifest_text']
572 return create_collection_from(c, src, dst, args)
574 # Fetch the collection's manifest.
575 manifest = c['manifest_text']
576 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
578 # Copy each block from src_keep to dst_keep.
579 # Use the newly signed locators returned from dst_keep to build
580 # a new manifest as we go.
581 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
582 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
583 dst_manifest = io.StringIO()
586 bytes_expected = total_collection_size(manifest)
588 progress_writer = ProgressWriter(human_progress)
590 progress_writer = None
592 # go through the words
593 # put each block loc into 'get' queue
594 # 'get' threads get block and put it into 'put' queue
595 # 'put' threads put block and then update dst_locators
597 # after going through the whole manifest we go back through it
598 # again and build dst_manifest
600 lock = threading.Lock()
602 # the get queue should be unbounded because we'll add all the
603 # block hashes we want to get, but these are small
604 get_queue = queue.Queue()
608 # the put queue contains full data blocks
609 # and if 'get' is faster than 'put' we could end up consuming
610 # a great deal of RAM if it isn't bounded.
611 put_queue = queue.Queue(threadcount)
616 word = get_queue.get()
619 get_queue.task_done()
622 blockhash = arvados.KeepLocator(word).md5sum
624 if blockhash in dst_locators:
626 get_queue.task_done()
630 logger.debug("Getting block %s", word)
631 data = src_keep.get(word)
632 put_queue.put((word, data))
634 logger.error("Error getting block %s: %s", word, e)
635 transfer_error.append(e)
637 # Drain the 'get' queue so we end early
640 get_queue.task_done()
644 get_queue.task_done()
647 nonlocal bytes_written
649 item = put_queue.get()
651 put_queue.task_done()
655 loc = arvados.KeepLocator(word)
656 blockhash = loc.md5sum
658 if blockhash in dst_locators:
660 put_queue.task_done()
664 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
665 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
667 dst_locators[blockhash] = dst_locator
668 bytes_written += loc.size
670 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
672 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
674 # Drain the 'get' queue so we end early
677 get_queue.task_done()
680 transfer_error.append(e)
682 put_queue.task_done()
684 for line in manifest.splitlines():
686 for word in words[1:]:
688 loc = arvados.KeepLocator(word)
690 # If 'word' can't be parsed as a locator,
691 # presume it's a filename.
696 for i in range(0, threadcount):
699 for i in range(0, threadcount):
700 threading.Thread(target=get_thread, daemon=True).start()
702 for i in range(0, threadcount):
703 threading.Thread(target=put_thread, daemon=True).start()
708 if len(transfer_error) > 0:
709 return {"error_token": "Failed to transfer blocks"}
711 for line in manifest.splitlines():
713 dst_manifest.write(words[0])
714 for word in words[1:]:
716 loc = arvados.KeepLocator(word)
718 # If 'word' can't be parsed as a locator,
719 # presume it's a filename.
720 dst_manifest.write(' ')
721 dst_manifest.write(word)
723 blockhash = loc.md5sum
724 dst_manifest.write(' ')
725 dst_manifest.write(dst_locators[blockhash])
726 dst_manifest.write("\n")
729 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
730 progress_writer.finish()
732 # Copy the manifest and save the collection.
733 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
735 c['manifest_text'] = dst_manifest.getvalue()
736 return create_collection_from(c, src, dst, args)
738 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
739 """Copy the docker image identified by docker_image and
740 docker_image_tag from src to dst. Create appropriate
741 docker_image_repo+tag and docker_image_hash links at dst.
745 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
747 # Find the link identifying this docker image.
748 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
749 src, args.retries, docker_image, docker_image_tag)
750 if docker_image_list:
751 image_uuid, image_info = docker_image_list[0]
752 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
754 # Copy the collection it refers to.
755 dst_image_col = copy_collection(image_uuid, src, dst, args)
756 elif arvados.util.keep_locator_pattern.match(docker_image):
757 dst_image_col = copy_collection(docker_image, src, dst, args)
759 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
761 def copy_project(obj_uuid, src, dst, owner_uuid, args):
763 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
765 # Create/update the destination project
766 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
767 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
768 if len(existing["items"]) == 0:
769 project_record = dst.groups().create(body={"group": {"group_class": "project",
770 "owner_uuid": owner_uuid,
771 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
773 project_record = existing["items"][0]
775 dst.groups().update(uuid=project_record["uuid"],
777 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
779 args.project_uuid = project_record["uuid"]
781 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
788 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
790 except Exception as e:
791 partial_error += "\n" + str(e)
794 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
796 copy_workflow(w["uuid"], src, dst, args)
797 except Exception as e:
798 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
801 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
803 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
804 except Exception as e:
805 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
807 project_record["partial_error"] = partial_error
809 return project_record
811 # git_rev_parse(rev, repo)
813 # Returns the 40-character commit hash corresponding to 'rev' in
814 # git repository 'repo' (which must be the path of a local git
817 def git_rev_parse(rev, repo):
818 proc = subprocess.run(
819 ['git', 'rev-parse', rev],
822 stdout=subprocess.PIPE,
825 return proc.stdout.read().strip()
827 # uuid_type(api, object_uuid)
829 # Returns the name of the class that object_uuid belongs to, based on
830 # the second field of the uuid. This function consults the api's
831 # schema to identify the object class.
833 # It returns a string such as 'Collection', 'Workflow', etc.
835 # Special case: if handed a Keep locator hash, return 'Collection'.
837 def uuid_type(api, object_uuid):
838 if re.match(arvados.util.keep_locator_pattern, object_uuid):
841 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
844 p = object_uuid.split('-')
847 for k in api._schema.schemas:
848 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
849 if type_prefix == obj_class:
854 def copy_from_http(url, src, dst, args):
856 project_uuid = args.project_uuid
857 varying_url_params = args.varying_url_params
858 prefer_cached_downloads = args.prefer_cached_downloads
860 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
861 varying_url_params=varying_url_params,
862 prefer_cached_downloads=prefer_cached_downloads)
863 if cached[2] is not None:
864 return copy_collection(cached[2], src, dst, args)
866 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
867 varying_url_params=varying_url_params,
868 prefer_cached_downloads=prefer_cached_downloads)
870 if cached is not None:
871 return {"uuid": cached[2]}
874 def abort(msg, code=1):
875 logger.info("arv-copy: %s", msg)
879 # Code for reporting on the progress of a collection upload.
880 # Stolen from arvados.commands.put.ArvPutCollectionWriter
881 # TODO(twp): figure out how to refactor into a shared library
882 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
885 def machine_progress(obj_uuid, bytes_written, bytes_expected):
886 return "{} {}: {} {} written {} total\n".format(
891 -1 if (bytes_expected is None) else bytes_expected)
893 def human_progress(obj_uuid, bytes_written, bytes_expected):
895 return "\r{}: {}M / {}M {:.1%} ".format(
897 bytes_written >> 20, bytes_expected >> 20,
898 float(bytes_written) / bytes_expected)
900 return "\r{}: {} ".format(obj_uuid, bytes_written)
902 class ProgressWriter(object):
903 _progress_func = None
906 def __init__(self, progress_func):
907 self._progress_func = progress_func
909 def report(self, obj_uuid, bytes_written, bytes_expected):
910 if self._progress_func is not None:
912 self._progress_func(obj_uuid, bytes_written, bytes_expected))
915 self.outfile.write("\n")
917 if __name__ == '__main__':