from builtins import str
import argparse
+import importlib.metadata
+import importlib.resources
import logging
import os
import sys
import re
-import pkg_resources # part of setuptools
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
import arvados
import arvados.config
+import arvados.logging
from arvados.keep import KeepClient
from arvados.errors import ApiError
import arvados.commands._util as arv_cmd
-from arvados.api import OrderedJsonModel
from .perf import Perf
from ._version import __version__
from .executor import ArvCwlExecutor
+from .fsaccess import workflow_uuid_pattern
# These aren't used directly in this file but
# other code expects to import them from here
def versionstring():
"""Print version string of key packages for provenance and debugging."""
-
- arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
- arvpkg = pkg_resources.require("arvados-python-client")
- cwlpkg = pkg_resources.require("cwltool")
-
- return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
- "arvados-python-client", arvpkg[0].version,
- "cwltool", cwlpkg[0].version)
-
+ return "{} {}, arvados-python-client {}, cwltool {}".format(
+ sys.argv[0],
+ importlib.metadata.version('arvados-cwl-runner'),
+ importlib.metadata.version('arvados-python-client'),
+ importlib.metadata.version('cwltool'),
+ )
def arg_parser(): # type: () -> argparse.ArgumentParser
- parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+ parser = argparse.ArgumentParser(
+ description='Arvados executor for Common Workflow Language',
+ parents=[arv_cmd.retry_opt],
+ )
parser.add_argument("--basedir",
help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
+ exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
default=True, dest="wait")
help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
default=False)
+ parser.add_argument("--match-submitter-images", action="store_true",
+ default=False, dest="match_local_docker",
+ help="Where Arvados has more than one Docker image of the same name, use image from the Docker instance on the submitting node.")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--submit-request-uuid",
default=None,
action="store_false", default=True,
help=argparse.SUPPRESS)
+ parser.add_argument("--disable-git", dest="git_info",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
parser.add_argument("--disable-color", dest="enable_color",
action="store_false", default=True,
help=argparse.SUPPRESS)
action="store_true", default=False,
help=argparse.SUPPRESS)
+ parser.add_argument("--fast-parser", dest="fast_parser",
+ action="store_true", default=False,
+ help=argparse.SUPPRESS)
+
parser.add_argument("--thread-count", type=int,
default=0, help="Number of threads to use for job submit and output collection.")
parser.add_argument("--http-timeout", type=int,
default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
+ parser.add_argument("--defer-downloads", action="store_true", default=False,
+ help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
+
+ parser.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
+ exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
parser.add_argument(
"--skip-schemas",
action="store_true",
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
supported_versions = ["v1.0", "v1.1", "v1.2"]
for s in supported_versions:
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-%s.yml' % s)
- customschema = res.read().decode('utf-8')
+ customschema = importlib.resources.read_text(__name__, f'arv-cwl-schema-{s}.yml', 'utf-8')
use_custom_schema(s, "http://arvados.org/cwl", customschema)
- res.close()
cwltool.process.supportedProcessRequirements.extend([
"http://arvados.org/cwl#RunInSingleContainer",
"http://arvados.org/cwl#OutputDirType",
"http://arvados.org/cwl#IntermediateOutput",
"http://arvados.org/cwl#ReuseRequirement",
"http://arvados.org/cwl#ClusterTarget",
- "http://arvados.org/cwl#OutputStorageClass"
+ "http://arvados.org/cwl#OutputStorageClass",
+ "http://arvados.org/cwl#ProcessProperties",
+ "http://commonwl.org/cwltool#CUDARequirement",
+ "http://arvados.org/cwl#UsePreemptible",
+ "http://arvados.org/cwl#OutputCollectionProperties",
+ "http://arvados.org/cwl#KeepCacheTypeRequirement",
+ "http://arvados.org/cwl#OutOfMemoryRetry",
])
def exit_signal_handler(sigcode, frame):
logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
sys.exit(-sigcode)
-def main(args, stdout, stderr, api_client=None, keep_client=None,
+def main(args=sys.argv[1:],
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ api_client=None,
+ keep_client=None,
install_sig_handlers=True):
parser = arg_parser()
return 1
arvargs.work_api = want_api
- if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+ workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
+
+ if workflow_op and not arvargs.job_order:
job_order_object = ({}, "")
add_arv_hints()
try:
if api_client is None:
api_client = arvados.safeapi.ThreadSafeApiCache(
- api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
- keep_params={"num_retries": 4})
+ api_params={
+ 'num_retries': arvargs.retries,
+ 'timeout': arvargs.http_timeout,
+ },
+ keep_params={
+ 'num_retries': arvargs.retries,
+ },
+ version='v1',
+ )
keep_client = api_client.keep
# Make an API object now so errors are reported early.
api_client.users().current().execute()
if keep_client is None:
- keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+ keep_client = arvados.keep.KeepClient(
+ api_client=api_client,
+ block_cache=block_cache,
+ num_retries=arvargs.retries,
+ )
+ executor = ArvCwlExecutor(
+ api_client,
+ arvargs,
+ keep_client=keep_client,
+ num_retries=arvargs.retries,
+ stdout=stdout,
+ )
except WorkflowException as e:
logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
return 1
# Note that unless in debug mode, some stack traces related to user
# workflow errors may be suppressed.
+
+ # Set the logging on most modules INFO (instead of default which is WARNING)
+ logger.setLevel(logging.INFO)
+ logging.getLogger('arvados').setLevel(logging.INFO)
+ logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+ # API retries are filtered to the INFO level and can be noisy, but as long as
+ # they succeed we don't need to see warnings about it.
+ googleapiclient_http_logger = logging.getLogger('googleapiclient.http')
+ googleapiclient_http_logger.addFilter(arvados.logging.GoogleHTTPClientFilter())
+ googleapiclient_http_logger.setLevel(logging.WARNING)
+
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
+ # In debug mode show logs about retries, but we arn't
+ # debugging the google client so we don't need to see
+ # everything.
+ googleapiclient_http_logger.setLevel(logging.NOTSET)
+ logging.getLogger('googleapiclient').setLevel(logging.INFO)
if arvargs.quiet:
logger.setLevel(logging.WARN)
# unit tests.
stdout = None
+ executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
+
+ if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
+ executor.loadingContext.do_validate = False
+ if arvargs.submit and not workflow_op:
+ executor.fast_submit = True
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
- input_required=not (arvargs.create_workflow or arvargs.update_workflow))
+ runtimeContext=executor.toplevel_runtimeContext,
+ input_required=not workflow_op)