1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have files $HOME/.config/arvados/{src}.conf and
17 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
18 # instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
43 import arvados.http_to_keep
45 from arvados._version import __version__
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
49 logger = logging.getLogger('arvados.arv-copy')
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
66 # The owner_uuid of the object being copied
70 copy_opts = argparse.ArgumentParser(add_help=False)
72 copy_opts.add_argument(
73 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74 help='Print version and exit.')
75 copy_opts.add_argument(
76 '-v', '--verbose', dest='verbose', action='store_true',
77 help='Verbose output.')
78 copy_opts.add_argument(
79 '--progress', dest='progress', action='store_true',
80 help='Report progress on copying collections. (default)')
81 copy_opts.add_argument(
82 '--no-progress', dest='progress', action='store_false',
83 help='Do not report progress on copying collections.')
84 copy_opts.add_argument(
85 '-f', '--force', dest='force', action='store_true',
86 help='Perform copy even if the object appears to exist at the remote destination.')
87 copy_opts.add_argument(
88 '--src', dest='source_arvados',
89 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
90 copy_opts.add_argument(
91 '--dst', dest='destination_arvados',
92 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
93 copy_opts.add_argument(
94 '--recursive', dest='recursive', action='store_true',
95 help='Recursively copy any dependencies for this object, and subprojects. (default)')
96 copy_opts.add_argument(
97 '--no-recursive', dest='recursive', action='store_false',
98 help='Do not copy any dependencies or subprojects.')
99 copy_opts.add_argument(
100 '--project-uuid', dest='project_uuid',
101 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
102 copy_opts.add_argument(
103 '--storage-classes', dest='storage_classes',
104 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
105 copy_opts.add_argument("--varying-url-params", type=str, default="",
106 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
108 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
109 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
111 copy_opts.add_argument(
113 help='The UUID of the object to be copied.')
114 copy_opts.set_defaults(progress=True)
115 copy_opts.set_defaults(recursive=True)
117 parser = argparse.ArgumentParser(
118 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
119 parents=[copy_opts, arv_cmd.retry_opt])
120 args = parser.parse_args()
122 if args.storage_classes:
123 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
126 logger.setLevel(logging.DEBUG)
128 logger.setLevel(logging.INFO)
130 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
131 args.source_arvados = args.object_uuid[:5]
133 # Create API clients for the source and destination instances
134 src_arv = api_for_instance(args.source_arvados, args.retries)
135 dst_arv = api_for_instance(args.destination_arvados, args.retries)
137 if not args.project_uuid:
138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
140 # Identify the kind of object we have been given, and begin copying.
141 t = uuid_type(src_arv, args.object_uuid)
144 if t == 'Collection':
145 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
146 result = copy_collection(args.object_uuid,
149 elif t == 'Workflow':
150 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
151 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
153 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
154 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
156 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
158 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
159 except Exception as e:
160 logger.error("%s", e, exc_info=args.verbose)
163 # Clean up any outstanding temp git repositories.
164 for d in local_repo_dir.values():
165 shutil.rmtree(d, ignore_errors=True)
170 # If no exception was thrown and the response does not have an
171 # error_token field, presume success
172 if result is None or 'error_token' in result or 'uuid' not in result:
174 logger.error("API server returned an error result: {}".format(result))
177 print(result['uuid'])
179 if result.get('partial_error'):
180 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
183 logger.info("Success: created copy with uuid {}".format(result['uuid']))
186 def set_src_owner_uuid(resource, uuid, args):
187 global src_owner_uuid
188 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
189 src_owner_uuid = c.get("owner_uuid")
191 # api_for_instance(instance_name)
193 # Creates an API client for the Arvados instance identified by
196 # If instance_name contains a slash, it is presumed to be a path
197 # (either local or absolute) to a file with Arvados configuration
200 # Otherwise, it is presumed to be the name of a file in
201 # $HOME/.config/arvados/instance_name.conf
203 def api_for_instance(instance_name, num_retries):
204 if not instance_name:
206 return arvados.api('v1')
208 if '/' in instance_name:
209 config_file = instance_name
211 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
214 cfg = arvados.config.load(config_file)
215 except (IOError, OSError) as e:
216 abort(("Could not open config file {}: {}\n" +
217 "You must make sure that your configuration tokens\n" +
218 "for Arvados instance {} are in {} and that this\n" +
219 "file is readable.").format(
220 config_file, e, instance_name, config_file))
222 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
224 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
225 ['1', 't', 'true', 'y', 'yes']))
226 client = arvados.api('v1',
227 host=cfg['ARVADOS_API_HOST'],
228 token=cfg['ARVADOS_API_TOKEN'],
229 insecure=api_is_insecure,
230 num_retries=num_retries,
233 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
236 # Check if git is available
237 def check_git_availability():
240 ['git', '--version'],
242 stdout=subprocess.DEVNULL,
244 except FileNotFoundError:
245 abort('git command is not available. Please ensure git is installed.')
248 def filter_iter(arg):
249 """Iterate a filter string-or-list.
251 Pass in a filter field that can either be a string or list.
252 This will iterate elements as if the field had been written as a list.
254 if isinstance(arg, str):
259 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
260 """Update a single repository filter in-place for the destination.
262 If the filter checks that the repository is src_repository, it is
263 updated to check that the repository is dst_repository. If it does
264 anything else, this function raises ValueError.
266 if src_repository is None:
267 raise ValueError("component does not specify a source repository")
268 elif dst_repository is None:
269 raise ValueError("no destination repository specified to update repository filter")
270 elif repo_filter[1:] == ['=', src_repository]:
271 repo_filter[2] = dst_repository
272 elif repo_filter[1:] == ['in', [src_repository]]:
273 repo_filter[2] = [dst_repository]
275 raise ValueError("repository filter is not a simple source match")
277 def migrate_script_version_filter(version_filter):
278 """Update a single script_version filter in-place for the destination.
280 Currently this function checks that all the filter operands are Git
281 commit hashes. If they're not, it raises ValueError to indicate that
282 the filter is not portable. It could be extended to make other
283 transformations in the future.
285 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
286 raise ValueError("script_version filter is not limited to commit hashes")
288 def attr_filtered(filter_, *attr_names):
289 """Return True if filter_ applies to any of attr_names, else False."""
290 return any((name == 'any') or (name in attr_names)
291 for name in filter_iter(filter_[0]))
293 @contextlib.contextmanager
294 def exception_handler(handler, *exc_types):
295 """If any exc_types are raised in the block, call handler on the exception."""
298 except exc_types as error:
302 # copy_workflow(wf_uuid, src, dst, args)
304 # Copies a workflow identified by wf_uuid from src to dst.
306 # If args.recursive is True, also copy any collections
307 # referenced in the workflow definition yaml.
309 # The owner_uuid of the new workflow is set to any given
310 # project_uuid or the user who copied the template.
312 # Returns the copied workflow object.
314 def copy_workflow(wf_uuid, src, dst, args):
315 # fetch the workflow from the source instance
316 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
318 if not wf["definition"]:
319 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
321 # copy collections and docker images
322 if args.recursive and wf["definition"]:
323 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
324 "ARVADOS_API_TOKEN": src.api_token,
325 "PATH": os.environ["PATH"]}
327 result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
328 capture_output=True, env=env)
329 except FileNotFoundError:
332 no_arv_copy = result.returncode == 2
335 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
336 elif result.returncode != 0:
337 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
339 locations = json.loads(result.stdout)
342 copy_collections(locations, src, dst, args)
344 # copy the workflow itself
346 wf['owner_uuid'] = args.project_uuid
348 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
349 ["name", "=", wf["name"]]]).execute()
350 if len(existing["items"]) == 0:
351 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
353 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
356 def workflow_collections(obj, locations, docker_images):
357 if isinstance(obj, dict):
358 loc = obj.get('location', None)
360 if loc.startswith("keep:"):
361 locations.append(loc[5:])
363 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
364 if docker_image is not None:
365 ds = docker_image.split(":", 1)
366 tag = ds[1] if len(ds)==2 else 'latest'
367 docker_images[ds[0]] = tag
370 workflow_collections(obj[x], locations, docker_images)
371 elif isinstance(obj, list):
373 workflow_collections(x, locations, docker_images)
375 # copy_collections(obj, src, dst, args)
377 # Recursively copies all collections referenced by 'obj' from src
378 # to dst. obj may be a dict or a list, in which case we run
379 # copy_collections on every value it contains. If it is a string,
380 # search it for any substring that matches a collection hash or uuid
381 # (this will find hidden references to collections like
382 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
384 # Returns a copy of obj with any old collection uuids replaced by
387 def copy_collections(obj, src, dst, args):
389 def copy_collection_fn(collection_match):
390 """Helper function for regex substitution: copies a single collection,
391 identified by the collection_match MatchObject, to the
392 destination. Returns the destination collection uuid (or the
393 portable data hash if that's what src_id is).
396 src_id = collection_match.group(0)
397 if src_id not in collections_copied:
398 dst_col = copy_collection(src_id, src, dst, args)
399 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
400 collections_copied[src_id] = src_id
402 collections_copied[src_id] = dst_col['uuid']
403 return collections_copied[src_id]
405 if isinstance(obj, str):
406 # Copy any collections identified in this string to dst, replacing
407 # them with the dst uuids as necessary.
408 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
409 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
411 elif isinstance(obj, dict):
412 return type(obj)((v, copy_collections(obj[v], src, dst, args))
414 elif isinstance(obj, list):
415 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
419 def total_collection_size(manifest_text):
420 """Return the total number of bytes in this collection (excluding
421 duplicate blocks)."""
425 for line in manifest_text.splitlines():
427 for word in words[1:]:
429 loc = arvados.KeepLocator(word)
431 continue # this word isn't a locator, skip it
432 if loc.md5sum not in locators_seen:
433 locators_seen[loc.md5sum] = True
434 total_bytes += loc.size
438 def create_collection_from(c, src, dst, args):
439 """Create a new collection record on dst, and copy Docker metadata if
442 collection_uuid = c['uuid']
444 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
448 body['name'] = "copied from " + collection_uuid
450 if args.storage_classes:
451 body['storage_classes_desired'] = args.storage_classes
453 body['owner_uuid'] = args.project_uuid
455 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
457 # Create docker_image_repo+tag and docker_image_hash links
458 # at the destination.
459 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
460 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
462 for src_link in docker_links:
463 body = {key: src_link[key]
464 for key in ['link_class', 'name', 'properties']}
465 body['head_uuid'] = dst_collection['uuid']
466 body['owner_uuid'] = args.project_uuid
468 lk = dst.links().create(body=body).execute(num_retries=args.retries)
469 logger.debug('created dst link {}'.format(lk))
471 return dst_collection
473 # copy_collection(obj_uuid, src, dst, args)
475 # Copies the collection identified by obj_uuid from src to dst.
476 # Returns the collection object created at dst.
478 # If args.progress is True, produce a human-friendly progress
481 # If a collection with the desired portable_data_hash already
482 # exists at dst, and args.force is False, copy_collection returns
483 # the existing collection without copying any blocks. Otherwise
484 # (if no collection exists or if args.force is True)
485 # copy_collection copies all of the collection data blocks from src
488 # For this application, it is critical to preserve the
489 # collection's manifest hash, which is not guaranteed with the
490 # arvados.CollectionReader and arvados.CollectionWriter classes.
491 # Copying each block in the collection manually, followed by
492 # the manifest block, ensures that the collection's manifest
493 # hash will not change.
495 def copy_collection(obj_uuid, src, dst, args):
496 if arvados.util.keep_locator_pattern.match(obj_uuid):
497 # If the obj_uuid is a portable data hash, it might not be
498 # uniquely identified with a particular collection. As a
499 # result, it is ambiguous as to what name to use for the copy.
500 # Apply some heuristics to pick which collection to get the
502 srccol = src.collections().list(
503 filters=[['portable_data_hash', '=', obj_uuid]],
504 order="created_at asc"
505 ).execute(num_retries=args.retries)
507 items = srccol.get("items")
510 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
516 # There's only one collection with the PDH, so use that.
519 # See if there is a collection that's in the same project
520 # as the root item (usually a workflow) being copied.
522 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
526 # Didn't find any collections located in the same project, so
527 # pick the oldest collection that has a name assigned to it.
533 # None of the collections have names (?!), so just pick the
537 # list() doesn't return manifest text (and we don't want it to,
538 # because we don't need the same maninfest text sent to us 50
539 # times) so go and retrieve the collection object directly
540 # which will include the manifest text.
541 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
543 # Assume this is an actual collection uuid, so fetch it directly.
544 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
546 # If a collection with this hash already exists at the
547 # destination, and 'force' is not true, just return that
550 if 'portable_data_hash' in c:
551 colhash = c['portable_data_hash']
554 dstcol = dst.collections().list(
555 filters=[['portable_data_hash', '=', colhash]]
556 ).execute(num_retries=args.retries)
557 if dstcol['items_available'] > 0:
558 for d in dstcol['items']:
559 if ((args.project_uuid == d['owner_uuid']) and
560 (c.get('name') == d['name']) and
561 (c['portable_data_hash'] == d['portable_data_hash'])):
563 c['manifest_text'] = dst.collections().get(
564 uuid=dstcol['items'][0]['uuid']
565 ).execute(num_retries=args.retries)['manifest_text']
566 return create_collection_from(c, src, dst, args)
568 # Fetch the collection's manifest.
569 manifest = c['manifest_text']
570 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
572 # Copy each block from src_keep to dst_keep.
573 # Use the newly signed locators returned from dst_keep to build
574 # a new manifest as we go.
575 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
576 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
577 dst_manifest = io.StringIO()
580 bytes_expected = total_collection_size(manifest)
582 progress_writer = ProgressWriter(human_progress)
584 progress_writer = None
586 # go through the words
587 # put each block loc into 'get' queue
588 # 'get' threads get block and put it into 'put' queue
589 # 'put' threads put block and then update dst_locators
591 # after going through the whole manifest we go back through it
592 # again and build dst_manifest
594 lock = threading.Lock()
596 # the get queue should be unbounded because we'll add all the
597 # block hashes we want to get, but these are small
598 get_queue = queue.Queue()
602 # the put queue contains full data blocks
603 # and if 'get' is faster than 'put' we could end up consuming
604 # a great deal of RAM if it isn't bounded.
605 put_queue = queue.Queue(threadcount)
610 word = get_queue.get()
613 get_queue.task_done()
616 blockhash = arvados.KeepLocator(word).md5sum
618 if blockhash in dst_locators:
620 get_queue.task_done()
624 logger.debug("Getting block %s", word)
625 data = src_keep.get(word)
626 put_queue.put((word, data))
628 logger.error("Error getting block %s: %s", word, e)
629 transfer_error.append(e)
631 # Drain the 'get' queue so we end early
634 get_queue.task_done()
638 get_queue.task_done()
641 nonlocal bytes_written
643 item = put_queue.get()
645 put_queue.task_done()
649 loc = arvados.KeepLocator(word)
650 blockhash = loc.md5sum
652 if blockhash in dst_locators:
654 put_queue.task_done()
658 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
659 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
661 dst_locators[blockhash] = dst_locator
662 bytes_written += loc.size
664 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
666 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
668 # Drain the 'get' queue so we end early
671 get_queue.task_done()
674 transfer_error.append(e)
676 put_queue.task_done()
678 for line in manifest.splitlines():
680 for word in words[1:]:
682 loc = arvados.KeepLocator(word)
684 # If 'word' can't be parsed as a locator,
685 # presume it's a filename.
690 for i in range(0, threadcount):
693 for i in range(0, threadcount):
694 threading.Thread(target=get_thread, daemon=True).start()
696 for i in range(0, threadcount):
697 threading.Thread(target=put_thread, daemon=True).start()
702 if len(transfer_error) > 0:
703 return {"error_token": "Failed to transfer blocks"}
705 for line in manifest.splitlines():
707 dst_manifest.write(words[0])
708 for word in words[1:]:
710 loc = arvados.KeepLocator(word)
712 # If 'word' can't be parsed as a locator,
713 # presume it's a filename.
714 dst_manifest.write(' ')
715 dst_manifest.write(word)
717 blockhash = loc.md5sum
718 dst_manifest.write(' ')
719 dst_manifest.write(dst_locators[blockhash])
720 dst_manifest.write("\n")
723 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
724 progress_writer.finish()
726 # Copy the manifest and save the collection.
727 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
729 c['manifest_text'] = dst_manifest.getvalue()
730 return create_collection_from(c, src, dst, args)
732 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
733 """Copy the docker image identified by docker_image and
734 docker_image_tag from src to dst. Create appropriate
735 docker_image_repo+tag and docker_image_hash links at dst.
739 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
741 # Find the link identifying this docker image.
742 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
743 src, args.retries, docker_image, docker_image_tag)
744 if docker_image_list:
745 image_uuid, image_info = docker_image_list[0]
746 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
748 # Copy the collection it refers to.
749 dst_image_col = copy_collection(image_uuid, src, dst, args)
750 elif arvados.util.keep_locator_pattern.match(docker_image):
751 dst_image_col = copy_collection(docker_image, src, dst, args)
753 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
755 def copy_project(obj_uuid, src, dst, owner_uuid, args):
757 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
759 # Create/update the destination project
760 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
761 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
762 if len(existing["items"]) == 0:
763 project_record = dst.groups().create(body={"group": {"group_class": "project",
764 "owner_uuid": owner_uuid,
765 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
767 project_record = existing["items"][0]
769 dst.groups().update(uuid=project_record["uuid"],
771 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
773 args.project_uuid = project_record["uuid"]
775 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
782 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
784 except Exception as e:
785 partial_error += "\n" + str(e)
788 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
790 copy_workflow(w["uuid"], src, dst, args)
791 except Exception as e:
792 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
795 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
797 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
798 except Exception as e:
799 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
801 project_record["partial_error"] = partial_error
803 return project_record
805 # git_rev_parse(rev, repo)
807 # Returns the 40-character commit hash corresponding to 'rev' in
808 # git repository 'repo' (which must be the path of a local git
811 def git_rev_parse(rev, repo):
812 proc = subprocess.run(
813 ['git', 'rev-parse', rev],
816 stdout=subprocess.PIPE,
819 return proc.stdout.read().strip()
821 # uuid_type(api, object_uuid)
823 # Returns the name of the class that object_uuid belongs to, based on
824 # the second field of the uuid. This function consults the api's
825 # schema to identify the object class.
827 # It returns a string such as 'Collection', 'Workflow', etc.
829 # Special case: if handed a Keep locator hash, return 'Collection'.
831 def uuid_type(api, object_uuid):
832 if re.match(arvados.util.keep_locator_pattern, object_uuid):
835 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
838 p = object_uuid.split('-')
841 for k in api._schema.schemas:
842 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
843 if type_prefix == obj_class:
848 def copy_from_http(url, src, dst, args):
850 project_uuid = args.project_uuid
851 varying_url_params = args.varying_url_params
852 prefer_cached_downloads = args.prefer_cached_downloads
854 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
855 varying_url_params=varying_url_params,
856 prefer_cached_downloads=prefer_cached_downloads)
857 if cached[2] is not None:
858 return copy_collection(cached[2], src, dst, args)
860 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
861 varying_url_params=varying_url_params,
862 prefer_cached_downloads=prefer_cached_downloads)
864 if cached is not None:
865 return {"uuid": cached[2]}
868 def abort(msg, code=1):
869 logger.info("arv-copy: %s", msg)
873 # Code for reporting on the progress of a collection upload.
874 # Stolen from arvados.commands.put.ArvPutCollectionWriter
875 # TODO(twp): figure out how to refactor into a shared library
876 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
879 def machine_progress(obj_uuid, bytes_written, bytes_expected):
880 return "{} {}: {} {} written {} total\n".format(
885 -1 if (bytes_expected is None) else bytes_expected)
887 def human_progress(obj_uuid, bytes_written, bytes_expected):
889 return "\r{}: {}M / {}M {:.1%} ".format(
891 bytes_written >> 20, bytes_expected >> 20,
892 float(bytes_written) / bytes_expected)
894 return "\r{}: {} ".format(obj_uuid, bytes_written)
896 class ProgressWriter(object):
897 _progress_func = None
900 def __init__(self, progress_func):
901 self._progress_func = progress_func
903 def report(self, obj_uuid, bytes_written, bytes_expected):
904 if self._progress_func is not None:
906 self._progress_func(obj_uuid, bytes_written, bytes_expected))
909 self.outfile.write("\n")
911 if __name__ == '__main__':