X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7feee6a3bbacc88a62faa35cf94cb0d9d1b04994..14db4dcdc2b0b7e5fbc81d62cf581dea5ccc07f4:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 6e45ec5765..1f8edb70db 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -6,6 +6,7 @@ # Implement cwl-runner interface for submitting and running work on Arvados, using # either the Crunch jobs API or Crunch containers API. +from future.utils import viewitems from builtins import str import argparse @@ -68,9 +69,9 @@ def versionstring(): def arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') - parser.add_argument("--basedir", type=str, + parser.add_argument("--basedir", help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).") - parser.add_argument("--outdir", type=str, default=os.path.abspath('.'), + parser.add_argument("--outdir", default=os.path.abspath('.'), help="Output directory, default current directory") parser.add_argument("--eval-timeout", @@ -101,9 +102,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=True, dest="enable_reuse", help="Disable job or container reuse") - parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.") - parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None) - parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None) + parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.") + parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None) + parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None) parser.add_argument("--ignore-docker-for-reuse", action="store_true", help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) @@ -130,7 +131,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines", default=True, dest="log_timestamps") - parser.add_argument("--api", type=str, + parser.add_argument("--api", default=None, dest="work_api", choices=("jobs", "containers"), help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.") @@ -143,7 +144,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="RAM (in MiB) required for the workflow runner job (default 1024)", default=None) - parser.add_argument("--submit-runner-image", type=str, + parser.add_argument("--submit-runner-image", help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__, default=None) @@ -152,11 +153,11 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=False) exgroup = parser.add_mutually_exclusive_group() - exgroup.add_argument("--submit-request-uuid", type=str, + exgroup.add_argument("--submit-request-uuid", default=None, help="Update and commit to supplied container request instead of creating a new one (containers API only).", metavar="UUID") - exgroup.add_argument("--submit-runner-cluster", type=str, + exgroup.add_argument("--submit-runner-cluster", help="Submit workflow runner to a remote cluster (containers API only)", default=None, metavar="CLUSTER_ID") @@ -165,7 +166,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=None, help="Collection cache size (in MiB, default 256).") - parser.add_argument("--name", type=str, + parser.add_argument("--name", help="Name to use for workflow execution instance.", default=None) @@ -177,7 +178,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--enable-dev", action="store_true", help="Enable loading and running development versions " "of CWL spec.", default=False) - parser.add_argument('--storage-classes', default="default", type=str, + parser.add_argument('--storage-classes', default="default", help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.") parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", @@ -210,7 +211,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=False, dest="trash_intermediate", help="Do not trash intermediate outputs (default).") - parser.add_argument("workflow", type=str, default=None, help="The workflow to execute") + parser.add_argument("workflow", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") return parser @@ -218,9 +219,15 @@ def arg_parser(): # type: () -> argparse.ArgumentParser def add_arv_hints(): cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE - res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') - use_custom_schema("v1.0", "http://arvados.org/cwl", res.read()) - res.close() + res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml') + res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml') + customschema10 = res10.read() + customschema11 = res11.read() + use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10) + use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11) + use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11) + res10.close() + res11.close() cwltool.process.supportedProcessRequirements.extend([ "http://arvados.org/cwl#RunInSingleContainer", "http://arvados.org/cwl#OutputDirType", @@ -273,7 +280,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, add_arv_hints() - for key, val in list(cwltool.argparser.get_default_args().items()): + for key, val in viewitems(cwltool.argparser.get_default_args()): if not hasattr(arvargs, key): setattr(arvargs, key, val) @@ -288,10 +295,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, if keep_client is None: keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4) - except Exception as e: - logger.error(e) + except Exception: + logger.exception("Error creating the Arvados CWL Executor") return 1 + # Note that unless in debug mode, some stack traces related to user + # workflow errors may be suppressed. See ArvadosJob.done(). if arvargs.debug: logger.setLevel(logging.DEBUG) logging.getLogger('arvados').setLevel(logging.DEBUG) @@ -312,6 +321,15 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, else: arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s')) + if stdout is sys.stdout: + # cwltool.main has code to work around encoding issues with + # sys.stdout and unix pipes (they default to ASCII encoding, + # we want utf-8), so when stdout is sys.stdout set it to None + # to take advantage of that. Don't override it for all cases + # since we still want to be able to capture stdout for the + # unit tests. + stdout = None + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr,