1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 # arv-copy [--recursive] [--no-recursive] object-uuid
7 # Copies an object from Arvados instance src to instance dst.
9 # By default, arv-copy recursively copies any dependent objects
10 # necessary to make the object functional in the new instance
11 # (e.g. for a workflow, arv-copy copies the workflow,
12 # input collections, and docker images). If
13 # --no-recursive is given, arv-copy copies only the single record
14 # identified by object-uuid.
16 # The user must have configuration files {src}.conf and
17 # {dst}.conf in a standard configuration directory with valid login credentials
18 # for instances src and dst. If either of these files is not found,
19 # arv-copy will issue an error.
41 import arvados.commands._util as arv_cmd
42 import arvados.commands.keepdocker
44 from arvados._internal import http_to_keep
45 from arvados._version import __version__
47 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
49 logger = logging.getLogger('arvados.arv-copy')
51 # local_repo_dir records which git repositories from the Arvados source
52 # instance have been checked out locally during this run, and to which
54 # e.g. if repository 'twp' from src_arv has been cloned into
55 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
59 # List of collections that have been copied in this session, and their
60 # destination collection UUIDs.
61 collections_copied = {}
63 # Set of (repository, script_version) two-tuples of commits copied in git.
64 scripts_copied = set()
66 # The owner_uuid of the object being copied
70 copy_opts = argparse.ArgumentParser(add_help=False)
72 copy_opts.add_argument(
73 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
74 help='Print version and exit.')
75 copy_opts.add_argument(
76 '-v', '--verbose', dest='verbose', action='store_true',
77 help='Verbose output.')
78 copy_opts.add_argument(
79 '--progress', dest='progress', action='store_true',
80 help='Report progress on copying collections. (default)')
81 copy_opts.add_argument(
82 '--no-progress', dest='progress', action='store_false',
83 help='Do not report progress on copying collections.')
84 copy_opts.add_argument(
85 '-f', '--force', dest='force', action='store_true',
86 help='Perform copy even if the object appears to exist at the remote destination.')
87 copy_opts.add_argument(
88 '--src', dest='source_arvados',
90 Client configuration location for the source Arvados cluster.
91 May be either a configuration file path, or a plain identifier like `foo`
92 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
93 If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
96 copy_opts.add_argument(
97 '--dst', dest='destination_arvados',
99 Client configuration location for the destination Arvados cluster.
100 May be either a configuration file path, or a plain identifier like `foo`
101 to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
102 If not provided, will use the default client configuration from the environment or `settings.conf`.
105 copy_opts.add_argument(
106 '--recursive', dest='recursive', action='store_true',
107 help='Recursively copy any dependencies for this object, and subprojects. (default)')
108 copy_opts.add_argument(
109 '--no-recursive', dest='recursive', action='store_false',
110 help='Do not copy any dependencies or subprojects.')
111 copy_opts.add_argument(
112 '--project-uuid', dest='project_uuid',
113 help='The UUID of the project at the destination to which the collection or workflow should be copied.')
114 copy_opts.add_argument(
115 '--storage-classes', dest='storage_classes',
116 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
117 copy_opts.add_argument("--varying-url-params", type=str, default="",
118 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
120 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
121 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
123 copy_opts.add_argument(
125 help='The UUID of the object to be copied.')
126 copy_opts.set_defaults(progress=True)
127 copy_opts.set_defaults(recursive=True)
129 parser = argparse.ArgumentParser(
130 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
131 parents=[copy_opts, arv_cmd.retry_opt])
132 args = parser.parse_args()
134 if args.storage_classes:
135 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
138 logger.setLevel(logging.DEBUG)
140 logger.setLevel(logging.INFO)
142 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
143 args.source_arvados = args.object_uuid[:5]
145 # Create API clients for the source and destination instances
146 src_arv = api_for_instance(args.source_arvados, args.retries)
147 dst_arv = api_for_instance(args.destination_arvados, args.retries)
149 if not args.project_uuid:
150 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
152 # Identify the kind of object we have been given, and begin copying.
153 t = uuid_type(src_arv, args.object_uuid)
156 if t == 'Collection':
157 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
158 result = copy_collection(args.object_uuid,
161 elif t == 'Workflow':
162 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
163 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
165 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
166 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
168 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
170 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
171 except Exception as e:
172 logger.error("%s", e, exc_info=args.verbose)
175 # Clean up any outstanding temp git repositories.
176 for d in local_repo_dir.values():
177 shutil.rmtree(d, ignore_errors=True)
182 # If no exception was thrown and the response does not have an
183 # error_token field, presume success
184 if result is None or 'error_token' in result or 'uuid' not in result:
186 logger.error("API server returned an error result: {}".format(result))
189 print(result['uuid'])
191 if result.get('partial_error'):
192 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
195 logger.info("Success: created copy with uuid {}".format(result['uuid']))
198 def set_src_owner_uuid(resource, uuid, args):
199 global src_owner_uuid
200 c = resource.get(uuid=uuid).execute(num_retries=args.retries)
201 src_owner_uuid = c.get("owner_uuid")
203 # api_for_instance(instance_name)
205 # Creates an API client for the Arvados instance identified by
208 # If instance_name contains a slash, it is presumed to be a path
209 # (either local or absolute) to a file with Arvados configuration
212 # Otherwise, it is presumed to be the name of a file in a standard
213 # configuration directory.
215 def api_for_instance(instance_name, num_retries):
216 if not instance_name:
218 return arvados.api('v1')
220 if '/' in instance_name:
221 config_file = instance_name
223 dirs = arvados.util._BaseDirectories('CONFIG')
224 config_file = next(dirs.search(f'{instance_name}.conf'), '')
227 cfg = arvados.config.load(config_file)
233 config_file = f'{instance_name}.conf'
234 abort(("Could not {} config file {}: {}\n" +
235 "You must make sure that your configuration tokens\n" +
236 "for Arvados instance {} are in {} and that this\n" +
237 "file is readable.").format(
238 verb, config_file, e.strerror, instance_name, config_file))
240 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
242 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
243 ['1', 't', 'true', 'y', 'yes']))
244 client = arvados.api('v1',
245 host=cfg['ARVADOS_API_HOST'],
246 token=cfg['ARVADOS_API_TOKEN'],
247 insecure=api_is_insecure,
248 num_retries=num_retries,
251 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
254 # Check if git is available
255 def check_git_availability():
258 ['git', '--version'],
260 stdout=subprocess.DEVNULL,
262 except FileNotFoundError:
263 abort('git command is not available. Please ensure git is installed.')
266 def filter_iter(arg):
267 """Iterate a filter string-or-list.
269 Pass in a filter field that can either be a string or list.
270 This will iterate elements as if the field had been written as a list.
272 if isinstance(arg, str):
277 def migrate_repository_filter(repo_filter, src_repository, dst_repository):
278 """Update a single repository filter in-place for the destination.
280 If the filter checks that the repository is src_repository, it is
281 updated to check that the repository is dst_repository. If it does
282 anything else, this function raises ValueError.
284 if src_repository is None:
285 raise ValueError("component does not specify a source repository")
286 elif dst_repository is None:
287 raise ValueError("no destination repository specified to update repository filter")
288 elif repo_filter[1:] == ['=', src_repository]:
289 repo_filter[2] = dst_repository
290 elif repo_filter[1:] == ['in', [src_repository]]:
291 repo_filter[2] = [dst_repository]
293 raise ValueError("repository filter is not a simple source match")
295 def migrate_script_version_filter(version_filter):
296 """Update a single script_version filter in-place for the destination.
298 Currently this function checks that all the filter operands are Git
299 commit hashes. If they're not, it raises ValueError to indicate that
300 the filter is not portable. It could be extended to make other
301 transformations in the future.
303 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
304 raise ValueError("script_version filter is not limited to commit hashes")
306 def attr_filtered(filter_, *attr_names):
307 """Return True if filter_ applies to any of attr_names, else False."""
308 return any((name == 'any') or (name in attr_names)
309 for name in filter_iter(filter_[0]))
311 @contextlib.contextmanager
312 def exception_handler(handler, *exc_types):
313 """If any exc_types are raised in the block, call handler on the exception."""
316 except exc_types as error:
320 # copy_workflow(wf_uuid, src, dst, args)
322 # Copies a workflow identified by wf_uuid from src to dst.
324 # If args.recursive is True, also copy any collections
325 # referenced in the workflow definition yaml.
327 # The owner_uuid of the new workflow is set to any given
328 # project_uuid or the user who copied the template.
330 # Returns the copied workflow object.
332 def copy_workflow(wf_uuid, src, dst, args):
333 # fetch the workflow from the source instance
334 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
336 if not wf["definition"]:
337 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
339 # copy collections and docker images
340 if args.recursive and wf["definition"]:
341 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
342 "ARVADOS_API_TOKEN": src.api_token,
343 "PATH": os.environ["PATH"]}
345 result = subprocess.run(
346 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
348 stdout=subprocess.PIPE,
349 universal_newlines=True,
351 except FileNotFoundError:
354 no_arv_copy = result.returncode == 2
357 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
358 elif result.returncode != 0:
359 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
361 locations = json.loads(result.stdout)
364 copy_collections(locations, src, dst, args)
366 # copy the workflow itself
368 wf['owner_uuid'] = args.project_uuid
370 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
371 ["name", "=", wf["name"]]]).execute()
372 if len(existing["items"]) == 0:
373 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
375 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
378 def workflow_collections(obj, locations, docker_images):
379 if isinstance(obj, dict):
380 loc = obj.get('location', None)
382 if loc.startswith("keep:"):
383 locations.append(loc[5:])
385 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
386 if docker_image is not None:
387 ds = docker_image.split(":", 1)
388 tag = ds[1] if len(ds)==2 else 'latest'
389 docker_images[ds[0]] = tag
392 workflow_collections(obj[x], locations, docker_images)
393 elif isinstance(obj, list):
395 workflow_collections(x, locations, docker_images)
397 # copy_collections(obj, src, dst, args)
399 # Recursively copies all collections referenced by 'obj' from src
400 # to dst. obj may be a dict or a list, in which case we run
401 # copy_collections on every value it contains. If it is a string,
402 # search it for any substring that matches a collection hash or uuid
403 # (this will find hidden references to collections like
404 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
406 # Returns a copy of obj with any old collection uuids replaced by
409 def copy_collections(obj, src, dst, args):
411 def copy_collection_fn(collection_match):
412 """Helper function for regex substitution: copies a single collection,
413 identified by the collection_match MatchObject, to the
414 destination. Returns the destination collection uuid (or the
415 portable data hash if that's what src_id is).
418 src_id = collection_match.group(0)
419 if src_id not in collections_copied:
420 dst_col = copy_collection(src_id, src, dst, args)
421 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
422 collections_copied[src_id] = src_id
424 collections_copied[src_id] = dst_col['uuid']
425 return collections_copied[src_id]
427 if isinstance(obj, str):
428 # Copy any collections identified in this string to dst, replacing
429 # them with the dst uuids as necessary.
430 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
431 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
433 elif isinstance(obj, dict):
434 return type(obj)((v, copy_collections(obj[v], src, dst, args))
436 elif isinstance(obj, list):
437 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
441 def total_collection_size(manifest_text):
442 """Return the total number of bytes in this collection (excluding
443 duplicate blocks)."""
447 for line in manifest_text.splitlines():
449 for word in words[1:]:
451 loc = arvados.KeepLocator(word)
453 continue # this word isn't a locator, skip it
454 if loc.md5sum not in locators_seen:
455 locators_seen[loc.md5sum] = True
456 total_bytes += loc.size
460 def create_collection_from(c, src, dst, args):
461 """Create a new collection record on dst, and copy Docker metadata if
464 collection_uuid = c['uuid']
466 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
470 body['name'] = "copied from " + collection_uuid
472 if args.storage_classes:
473 body['storage_classes_desired'] = args.storage_classes
475 body['owner_uuid'] = args.project_uuid
477 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
479 # Create docker_image_repo+tag and docker_image_hash links
480 # at the destination.
481 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
482 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
484 for src_link in docker_links:
485 body = {key: src_link[key]
486 for key in ['link_class', 'name', 'properties']}
487 body['head_uuid'] = dst_collection['uuid']
488 body['owner_uuid'] = args.project_uuid
490 lk = dst.links().create(body=body).execute(num_retries=args.retries)
491 logger.debug('created dst link {}'.format(lk))
493 return dst_collection
495 # copy_collection(obj_uuid, src, dst, args)
497 # Copies the collection identified by obj_uuid from src to dst.
498 # Returns the collection object created at dst.
500 # If args.progress is True, produce a human-friendly progress
503 # If a collection with the desired portable_data_hash already
504 # exists at dst, and args.force is False, copy_collection returns
505 # the existing collection without copying any blocks. Otherwise
506 # (if no collection exists or if args.force is True)
507 # copy_collection copies all of the collection data blocks from src
510 # For this application, it is critical to preserve the
511 # collection's manifest hash, which is not guaranteed with the
512 # arvados.CollectionReader and arvados.CollectionWriter classes.
513 # Copying each block in the collection manually, followed by
514 # the manifest block, ensures that the collection's manifest
515 # hash will not change.
517 def copy_collection(obj_uuid, src, dst, args):
518 if arvados.util.keep_locator_pattern.match(obj_uuid):
519 # If the obj_uuid is a portable data hash, it might not be
520 # uniquely identified with a particular collection. As a
521 # result, it is ambiguous as to what name to use for the copy.
522 # Apply some heuristics to pick which collection to get the
524 srccol = src.collections().list(
525 filters=[['portable_data_hash', '=', obj_uuid]],
526 order="created_at asc"
527 ).execute(num_retries=args.retries)
529 items = srccol.get("items")
532 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
538 # There's only one collection with the PDH, so use that.
541 # See if there is a collection that's in the same project
542 # as the root item (usually a workflow) being copied.
544 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
548 # Didn't find any collections located in the same project, so
549 # pick the oldest collection that has a name assigned to it.
555 # None of the collections have names (?!), so just pick the
559 # list() doesn't return manifest text (and we don't want it to,
560 # because we don't need the same maninfest text sent to us 50
561 # times) so go and retrieve the collection object directly
562 # which will include the manifest text.
563 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
565 # Assume this is an actual collection uuid, so fetch it directly.
566 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
568 # If a collection with this hash already exists at the
569 # destination, and 'force' is not true, just return that
572 if 'portable_data_hash' in c:
573 colhash = c['portable_data_hash']
576 dstcol = dst.collections().list(
577 filters=[['portable_data_hash', '=', colhash]]
578 ).execute(num_retries=args.retries)
579 if dstcol['items_available'] > 0:
580 for d in dstcol['items']:
581 if ((args.project_uuid == d['owner_uuid']) and
582 (c.get('name') == d['name']) and
583 (c['portable_data_hash'] == d['portable_data_hash'])):
585 c['manifest_text'] = dst.collections().get(
586 uuid=dstcol['items'][0]['uuid']
587 ).execute(num_retries=args.retries)['manifest_text']
588 return create_collection_from(c, src, dst, args)
590 # Fetch the collection's manifest.
591 manifest = c['manifest_text']
592 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
594 # Copy each block from src_keep to dst_keep.
595 # Use the newly signed locators returned from dst_keep to build
596 # a new manifest as we go.
597 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
598 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
599 dst_manifest = io.StringIO()
602 bytes_expected = total_collection_size(manifest)
604 progress_writer = ProgressWriter(human_progress)
606 progress_writer = None
608 # go through the words
609 # put each block loc into 'get' queue
610 # 'get' threads get block and put it into 'put' queue
611 # 'put' threads put block and then update dst_locators
613 # after going through the whole manifest we go back through it
614 # again and build dst_manifest
616 lock = threading.Lock()
618 # the get queue should be unbounded because we'll add all the
619 # block hashes we want to get, but these are small
620 get_queue = queue.Queue()
624 # the put queue contains full data blocks
625 # and if 'get' is faster than 'put' we could end up consuming
626 # a great deal of RAM if it isn't bounded.
627 put_queue = queue.Queue(threadcount)
632 word = get_queue.get()
635 get_queue.task_done()
638 blockhash = arvados.KeepLocator(word).md5sum
640 if blockhash in dst_locators:
642 get_queue.task_done()
646 logger.debug("Getting block %s", word)
647 data = src_keep.get(word)
648 put_queue.put((word, data))
649 except Exception as e:
650 logger.error("Error getting block %s: %s", word, e)
651 transfer_error.append(e)
653 # Drain the 'get' queue so we end early
656 get_queue.task_done()
660 get_queue.task_done()
663 nonlocal bytes_written
665 item = put_queue.get()
667 put_queue.task_done()
671 loc = arvados.KeepLocator(word)
672 blockhash = loc.md5sum
674 if blockhash in dst_locators:
676 put_queue.task_done()
680 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
681 dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
683 dst_locators[blockhash] = dst_locator
684 bytes_written += loc.size
686 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
687 except Exception as e:
688 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
690 # Drain the 'get' queue so we end early
693 get_queue.task_done()
696 transfer_error.append(e)
698 put_queue.task_done()
700 for line in manifest.splitlines():
702 for word in words[1:]:
704 loc = arvados.KeepLocator(word)
706 # If 'word' can't be parsed as a locator,
707 # presume it's a filename.
712 for i in range(0, threadcount):
715 for i in range(0, threadcount):
716 threading.Thread(target=get_thread, daemon=True).start()
718 for i in range(0, threadcount):
719 threading.Thread(target=put_thread, daemon=True).start()
724 if len(transfer_error) > 0:
725 return {"error_token": "Failed to transfer blocks"}
727 for line in manifest.splitlines():
729 dst_manifest.write(words[0])
730 for word in words[1:]:
732 loc = arvados.KeepLocator(word)
734 # If 'word' can't be parsed as a locator,
735 # presume it's a filename.
736 dst_manifest.write(' ')
737 dst_manifest.write(word)
739 blockhash = loc.md5sum
740 dst_manifest.write(' ')
741 dst_manifest.write(dst_locators[blockhash])
742 dst_manifest.write("\n")
745 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
746 progress_writer.finish()
748 # Copy the manifest and save the collection.
749 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
751 c['manifest_text'] = dst_manifest.getvalue()
752 return create_collection_from(c, src, dst, args)
754 def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
755 """Copy the docker image identified by docker_image and
756 docker_image_tag from src to dst. Create appropriate
757 docker_image_repo+tag and docker_image_hash links at dst.
761 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
763 # Find the link identifying this docker image.
764 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
765 src, args.retries, docker_image, docker_image_tag)
766 if docker_image_list:
767 image_uuid, image_info = docker_image_list[0]
768 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
770 # Copy the collection it refers to.
771 dst_image_col = copy_collection(image_uuid, src, dst, args)
772 elif arvados.util.keep_locator_pattern.match(docker_image):
773 dst_image_col = copy_collection(docker_image, src, dst, args)
775 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
777 def copy_project(obj_uuid, src, dst, owner_uuid, args):
779 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
781 # Create/update the destination project
782 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
783 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
784 if len(existing["items"]) == 0:
785 project_record = dst.groups().create(body={"group": {"group_class": "project",
786 "owner_uuid": owner_uuid,
787 "name": src_project_record["name"]}}).execute(num_retries=args.retries)
789 project_record = existing["items"][0]
791 dst.groups().update(uuid=project_record["uuid"],
793 "description": src_project_record["description"]}}).execute(num_retries=args.retries)
795 args.project_uuid = project_record["uuid"]
797 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
804 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
806 except Exception as e:
807 partial_error += "\n" + str(e)
810 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
812 copy_workflow(w["uuid"], src, dst, args)
813 except Exception as e:
814 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
817 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
819 copy_project(g["uuid"], src, dst, project_record["uuid"], args)
820 except Exception as e:
821 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
823 project_record["partial_error"] = partial_error
825 return project_record
827 # git_rev_parse(rev, repo)
829 # Returns the 40-character commit hash corresponding to 'rev' in
830 # git repository 'repo' (which must be the path of a local git
833 def git_rev_parse(rev, repo):
834 proc = subprocess.run(
835 ['git', 'rev-parse', rev],
838 stdout=subprocess.PIPE,
841 return proc.stdout.read().strip()
843 # uuid_type(api, object_uuid)
845 # Returns the name of the class that object_uuid belongs to, based on
846 # the second field of the uuid. This function consults the api's
847 # schema to identify the object class.
849 # It returns a string such as 'Collection', 'Workflow', etc.
851 # Special case: if handed a Keep locator hash, return 'Collection'.
853 def uuid_type(api, object_uuid):
854 if re.match(arvados.util.keep_locator_pattern, object_uuid):
857 if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
860 p = object_uuid.split('-')
863 for k in api._schema.schemas:
864 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
865 if type_prefix == obj_class:
870 def copy_from_http(url, src, dst, args):
872 project_uuid = args.project_uuid
873 varying_url_params = args.varying_url_params
874 prefer_cached_downloads = args.prefer_cached_downloads
876 cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
877 varying_url_params=varying_url_params,
878 prefer_cached_downloads=prefer_cached_downloads)
879 if cached[2] is not None:
880 return copy_collection(cached[2], src, dst, args)
882 cached = http_to_keep.http_to_keep(dst, project_uuid, url,
883 varying_url_params=varying_url_params,
884 prefer_cached_downloads=prefer_cached_downloads)
886 if cached is not None:
887 return {"uuid": cached[2]}
890 def abort(msg, code=1):
891 logger.info("arv-copy: %s", msg)
895 # Code for reporting on the progress of a collection upload.
896 # Stolen from arvados.commands.put.ArvPutCollectionWriter
897 # TODO(twp): figure out how to refactor into a shared library
898 # (may involve refactoring some arvados.commands.arv_copy.copy_collection
901 def machine_progress(obj_uuid, bytes_written, bytes_expected):
902 return "{} {}: {} {} written {} total\n".format(
907 -1 if (bytes_expected is None) else bytes_expected)
909 def human_progress(obj_uuid, bytes_written, bytes_expected):
911 return "\r{}: {}M / {}M {:.1%} ".format(
913 bytes_written >> 20, bytes_expected >> 20,
914 float(bytes_written) / bytes_expected)
916 return "\r{}: {} ".format(obj_uuid, bytes_written)
918 class ProgressWriter(object):
919 _progress_func = None
922 def __init__(self, progress_func):
923 self._progress_func = progress_func
925 def report(self, obj_uuid, bytes_written, bytes_expected):
926 if self._progress_func is not None:
928 self._progress_func(obj_uuid, bytes_written, bytes_expected))
931 self.outfile.write("\n")
933 if __name__ == '__main__':