# instances src and dst. If either of these files is not found,
# arv-copy will issue an error.
-from __future__ import division
-from future import standard_library
-from future.utils import listvalues
-standard_library.install_aliases()
-from past.builtins import basestring
-from builtins import object
import argparse
import contextlib
import getpass
import os
import re
import shutil
+import subprocess
import sys
import logging
import tempfile
import urllib.parse
import io
+import json
+import queue
+import threading
import arvados
import arvados.config
import arvados.util
import arvados.commands._util as arv_cmd
import arvados.commands.keepdocker
-import ruamel.yaml as yaml
+import arvados.http_to_keep
-from arvados.api import OrderedJsonModel
from arvados._version import __version__
COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
help='Perform copy even if the object appears to exist at the remote destination.')
copy_opts.add_argument(
'--src', dest='source_arvados',
- help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+ help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.')
copy_opts.add_argument(
'--dst', dest='destination_arvados',
- help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
+ help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.')
copy_opts.add_argument(
'--recursive', dest='recursive', action='store_true',
help='Recursively copy any dependencies for this object, and subprojects. (default)')
copy_opts.add_argument(
'--storage-classes', dest='storage_classes',
help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+ copy_opts.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
copy_opts.add_argument(
'object_uuid',
copy_opts.set_defaults(recursive=True)
parser = argparse.ArgumentParser(
- description='Copy a workflow or collection from one Arvados instance to another.',
+ description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
parents=[copy_opts, arv_cmd.retry_opt])
args = parser.parse_args()
else:
logger.setLevel(logging.INFO)
- if not args.source_arvados:
+ if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
args.source_arvados = args.object_uuid[:5]
# Create API clients for the source and destination instances
- src_arv = api_for_instance(args.source_arvados)
- dst_arv = api_for_instance(args.destination_arvados)
+ src_arv = api_for_instance(args.source_arvados, args.retries)
+ dst_arv = api_for_instance(args.destination_arvados, args.retries)
if not args.project_uuid:
args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
# Identify the kind of object we have been given, and begin copying.
t = uuid_type(src_arv, args.object_uuid)
- if t == 'Collection':
- set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
- result = copy_collection(args.object_uuid,
- src_arv, dst_arv,
- args)
- elif t == 'Workflow':
- set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
- result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
- elif t == 'Group':
- set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
- result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
- else:
- abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+ try:
+ if t == 'Collection':
+ set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+ result = copy_collection(args.object_uuid,
+ src_arv, dst_arv,
+ args)
+ elif t == 'Workflow':
+ set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+ result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+ elif t == 'Group':
+ set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+ result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+ elif t == 'httpURL':
+ result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+ else:
+ abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+ except Exception as e:
+ logger.error("%s", e, exc_info=args.verbose)
+ exit(1)
# Clean up any outstanding temp git repositories.
- for d in listvalues(local_repo_dir):
+ for d in local_repo_dir.values():
shutil.rmtree(d, ignore_errors=True)
+ if not result:
+ exit(1)
+
# If no exception was thrown and the response does not have an
# error_token field, presume success
- if 'error_token' in result or 'uuid' not in result:
- logger.error("API server returned an error result: {}".format(result))
+ if result is None or 'error_token' in result or 'uuid' not in result:
+ if result:
+ logger.error("API server returned an error result: {}".format(result))
+ exit(1)
+
+ print(result['uuid'])
+
+ if result.get('partial_error'):
+ logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
exit(1)
- logger.info("")
logger.info("Success: created copy with uuid {}".format(result['uuid']))
exit(0)
# Otherwise, it is presumed to be the name of a file in
# $HOME/.config/arvados/instance_name.conf
#
-def api_for_instance(instance_name):
+def api_for_instance(instance_name, num_retries):
if not instance_name:
# Use environment
- return arvados.api('v1', model=OrderedJsonModel())
+ return arvados.api('v1')
if '/' in instance_name:
config_file = instance_name
host=cfg['ARVADOS_API_HOST'],
token=cfg['ARVADOS_API_TOKEN'],
insecure=api_is_insecure,
- model=OrderedJsonModel())
+ num_retries=num_retries,
+ )
else:
abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
return client
# Check if git is available
def check_git_availability():
try:
- arvados.util.run_command(['git', '--help'])
- except Exception:
+ subprocess.run(
+ ['git', '--version'],
+ check=True,
+ stdout=subprocess.DEVNULL,
+ )
+ except FileNotFoundError:
abort('git command is not available. Please ensure git is installed.')
Pass in a filter field that can either be a string or list.
This will iterate elements as if the field had been written as a list.
"""
- if isinstance(arg, basestring):
- return iter((arg,))
+ if isinstance(arg, str):
+ yield arg
else:
- return iter(arg)
+ yield from arg
def migrate_repository_filter(repo_filter, src_repository, dst_repository):
"""Update a single repository filter in-place for the destination.
# fetch the workflow from the source instance
wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
+ if not wf["definition"]:
+ logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
+
# copy collections and docker images
- if args.recursive:
- wf_def = yaml.safe_load(wf["definition"])
- if wf_def is not None:
- locations = []
- docker_images = {}
- graph = wf_def.get('$graph', None)
- if graph is not None:
- workflow_collections(graph, locations, docker_images)
- else:
- workflow_collections(wf_def, locations, docker_images)
+ if args.recursive and wf["definition"]:
+ env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+ "ARVADOS_API_TOKEN": src.api_token,
+ "PATH": os.environ["PATH"]}
+ try:
+ result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+ capture_output=True, env=env)
+ except FileNotFoundError:
+ no_arv_copy = True
+ else:
+ no_arv_copy = result.returncode == 2
+
+ if no_arv_copy:
+ raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
+ elif result.returncode != 0:
+ raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
- if locations:
- copy_collections(locations, src, dst, args)
+ locations = json.loads(result.stdout)
- for image in docker_images:
- copy_docker_image(image, docker_images[image], src, dst, args)
+ if locations:
+ copy_collections(locations, src, dst, args)
# copy the workflow itself
del wf['uuid']
collections_copied[src_id] = dst_col['uuid']
return collections_copied[src_id]
- if isinstance(obj, basestring):
+ if isinstance(obj, str):
# Copy any collections identified in this string to dst, replacing
# them with the dst uuids as necessary.
obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
else:
progress_writer = None
+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+
+ # the get queue should be unbounded because we'll add all the
+ # block hashes we want to get, but these are small
+ get_queue = queue.Queue()
+
+ threadcount = 4
+
+ # the put queue contains full data blocks
+ # and if 'get' is faster than 'put' we could end up consuming
+ # a great deal of RAM if it isn't bounded.
+ put_queue = queue.Queue(threadcount)
+ transfer_error = []
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
+ return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ get_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
+
+ def put_thread():
+ nonlocal bytes_written
+ while True:
+ item = put_queue.get()
+ if item is None:
+ put_queue.task_done()
+ return
+
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ # Already uploaded
+ put_queue.task_done()
+ continue
+
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ get_queue.task_done()
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ continue
+
+ get_queue.put(word)
+
+ for i in range(0, threadcount):
+ get_queue.put(None)
+
+ for i in range(0, threadcount):
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ for i in range(0, threadcount):
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+
for line in manifest.splitlines():
words = line.split()
dst_manifest.write(words[0])
dst_manifest.write(word)
continue
blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
dst_manifest.write(' ')
dst_manifest.write(dst_locators[blockhash])
dst_manifest.write("\n")
c['manifest_text'] = dst_manifest.getvalue()
return create_collection_from(c, src, dst, args)
-def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
- r = api.repositories().list(
- filters=[['name', '=', repo_name]]).execute(num_retries=retries)
- if r['items_available'] != 1:
- raise Exception('cannot identify repo {}; {} repos found'
- .format(repo_name, r['items_available']))
-
- https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
- http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
- other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
-
- priority = https_url + other_url + http_url
-
- git_config = []
- git_url = None
- for url in priority:
- if url.startswith("http"):
- u = urllib.parse.urlsplit(url)
- baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
- git_config = ["-c", "credential.%s/.username=none" % baseurl,
- "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
- else:
- git_config = []
-
- try:
- logger.debug("trying %s", url)
- arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
- env={"HOME": os.environ["HOME"],
- "ARVADOS_API_TOKEN": api.api_token,
- "GIT_ASKPASS": "/bin/false"})
- except arvados.errors.CommandFailedError:
- pass
- else:
- git_url = url
- break
-
- if not git_url:
- raise Exception('Cannot access git repository, tried {}'
- .format(priority))
-
- if git_url.startswith("http:"):
- if allow_insecure_http:
- logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
- else:
- raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
-
- return (git_url, git_config)
-
-
def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
"""Copy the docker image identified by docker_image and
docker_image_tag from src to dst. Create appropriate
logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
+
+ partial_error = ""
+
# Copy collections
- copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
- src, dst, args)
+ try:
+ copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+ src, dst, args)
+ except Exception as e:
+ partial_error += "\n" + str(e)
# Copy workflows
- for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
- copy_workflow(w["uuid"], src, dst, args)
+ for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ try:
+ copy_workflow(w["uuid"], src, dst, args)
+ except Exception as e:
+ partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
if args.recursive:
- for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
- copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+ for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ try:
+ copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+ except Exception as e:
+ partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
+
+ project_record["partial_error"] = partial_error
return project_record
# repository)
#
def git_rev_parse(rev, repo):
- gitout, giterr = arvados.util.run_command(
- ['git', 'rev-parse', rev], cwd=repo)
- return gitout.strip()
+ proc = subprocess.run(
+ ['git', 'rev-parse', rev],
+ check=True,
+ cwd=repo,
+ stdout=subprocess.PIPE,
+ text=True,
+ )
+ return proc.stdout.read().strip()
# uuid_type(api, object_uuid)
#
def uuid_type(api, object_uuid):
if re.match(arvados.util.keep_locator_pattern, object_uuid):
return 'Collection'
+
+ if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+ return 'httpURL'
+
p = object_uuid.split('-')
if len(p) == 3:
type_prefix = p[1]
return k
return None
+
+def copy_from_http(url, src, dst, args):
+
+ project_uuid = args.project_uuid
+ varying_url_params = args.varying_url_params
+ prefer_cached_downloads = args.prefer_cached_downloads
+
+ cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+ if cached[2] is not None:
+ return copy_collection(cached[2], src, dst, args)
+
+ cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+
+ if cached is not None:
+ return {"uuid": cached[2]}
+
+
def abort(msg, code=1):
logger.info("arv-copy: %s", msg)
exit(code)