X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3fa6aa4043286ad61e5f29c136d3cc2942e8750d..7fec33bab2fb68405a1c641d3cd956d21487e14b:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 08a05d571c..fd3b7a5d16 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -28,14 +28,15 @@ from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing import arvados import arvados.config +import arvados.logging from arvados.keep import KeepClient from arvados.errors import ApiError import arvados.commands._util as arv_cmd -from arvados.api import OrderedJsonModel from .perf import Perf from ._version import __version__ from .executor import ArvCwlExecutor +from .fsaccess import workflow_uuid_pattern # These aren't used directly in this file but # other code expects to import them from here @@ -67,7 +68,10 @@ def versionstring(): def arg_parser(): # type: () -> argparse.ArgumentParser - parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') + parser = argparse.ArgumentParser( + description='Arvados executor for Common Workflow Language', + parents=[arv_cmd.retry_opt], + ) parser.add_argument("--basedir", help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).") @@ -119,6 +123,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench") exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.") + exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.") + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.", default=True, dest="wait") @@ -199,6 +205,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser action="store_false", default=True, help=argparse.SUPPRESS) + parser.add_argument("--disable-git", dest="git_info", + action="store_false", default=True, + help=argparse.SUPPRESS) + parser.add_argument("--disable-color", dest="enable_color", action="store_false", default=True, help=argparse.SUPPRESS) @@ -207,12 +217,25 @@ def arg_parser(): # type: () -> argparse.ArgumentParser action="store_true", default=False, help=argparse.SUPPRESS) + parser.add_argument("--fast-parser", dest="fast_parser", + action="store_true", default=False, + help=argparse.SUPPRESS) + parser.add_argument("--thread-count", type=int, default=0, help="Number of threads to use for job submit and output collection.") parser.add_argument("--http-timeout", type=int, default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).") + parser.add_argument("--defer-downloads", action="store_true", default=False, + help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.") + + parser.add_argument("--varying-url-params", type=str, default="", + help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") + + parser.add_argument("--prefer-cached-downloads", action="store_true", default=False, + help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.") exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.") @@ -266,6 +289,8 @@ def add_arv_hints(): "http://commonwl.org/cwltool#CUDARequirement", "http://arvados.org/cwl#UsePreemptible", "http://arvados.org/cwl#OutputCollectionProperties", + "http://arvados.org/cwl#KeepCacheTypeRequirement", + "http://arvados.org/cwl#OutOfMemoryRetry", ]) def exit_signal_handler(sigcode, frame): @@ -301,7 +326,9 @@ def main(args=sys.argv[1:], return 1 arvargs.work_api = want_api - if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order: + workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps + + if workflow_op and not arvargs.job_order: job_order_object = ({}, "") add_arv_hints() @@ -313,14 +340,32 @@ def main(args=sys.argv[1:], try: if api_client is None: api_client = arvados.safeapi.ThreadSafeApiCache( - api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout}, - keep_params={"num_retries": 4}) + api_params={ + 'num_retries': arvargs.retries, + 'timeout': arvargs.http_timeout, + }, + keep_params={ + 'num_retries': arvargs.retries, + }, + version='v1', + ) keep_client = api_client.keep # Make an API object now so errors are reported early. api_client.users().current().execute() if keep_client is None: - keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) - executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout) + block_cache = arvados.keep.KeepBlockCache(disk_cache=True) + keep_client = arvados.keep.KeepClient( + api_client=api_client, + block_cache=block_cache, + num_retries=arvargs.retries, + ) + executor = ArvCwlExecutor( + api_client, + arvargs, + keep_client=keep_client, + num_retries=arvargs.retries, + stdout=stdout, + ) except WorkflowException as e: logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False)) return 1 @@ -330,9 +375,25 @@ def main(args=sys.argv[1:], # Note that unless in debug mode, some stack traces related to user # workflow errors may be suppressed. + + # Set the logging on most modules INFO (instead of default which is WARNING) + logger.setLevel(logging.INFO) + logging.getLogger('arvados').setLevel(logging.INFO) + logging.getLogger('arvados.keep').setLevel(logging.WARNING) + # API retries are filtered to the INFO level and can be noisy, but as long as + # they succeed we don't need to see warnings about it. + googleapiclient_http_logger = logging.getLogger('googleapiclient.http') + googleapiclient_http_logger.addFilter(arvados.logging.GoogleHTTPClientFilter()) + googleapiclient_http_logger.setLevel(logging.WARNING) + if arvargs.debug: logger.setLevel(logging.DEBUG) logging.getLogger('arvados').setLevel(logging.DEBUG) + # In debug mode show logs about retries, but we arn't + # debugging the google client so we don't need to see + # everything. + googleapiclient_http_logger.setLevel(logging.NOTSET) + logging.getLogger('googleapiclient').setLevel(logging.INFO) if arvargs.quiet: logger.setLevel(logging.WARN) @@ -359,6 +420,13 @@ def main(args=sys.argv[1:], # unit tests. stdout = None + executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__ + + if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"): + executor.loadingContext.do_validate = False + if arvargs.submit and not workflow_op: + executor.fast_submit = True + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, @@ -369,4 +437,4 @@ def main(args=sys.argv[1:], custom_schema_callback=add_arv_hints, loadingContext=executor.loadingContext, runtimeContext=executor.toplevel_runtimeContext, - input_required=not (arvargs.create_workflow or arvargs.update_workflow)) + input_required=not workflow_op)