-#!/usr/bin/env python
+#!/usr/bin/env python3
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
# Implement cwl-runner interface for submitting and running work on Arvados, using
-# either the Crunch jobs API or Crunch containers API.
+# the Crunch containers API.
+
+from future.utils import viewitems
+from builtins import str
import argparse
import logging
import cwltool.process
import cwltool.argparser
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
import arvados
import arvados.config
from ._version import __version__
from .executor import ArvCwlExecutor
-# These arn't used directly in this file but
+# These aren't used directly in this file but
# other code expects to import them from here
from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
from .arvtool import ArvadosCommandTool
from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
from .util import get_current_container
def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
- parser.add_argument("--basedir", type=str,
+ parser.add_argument("--basedir",
help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
- parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
+ parser.add_argument("--outdir", default=os.path.abspath('.'),
help="Output directory, default current directory")
parser.add_argument("--eval-timeout",
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
- help="Enable job or container reuse (default)")
+ help="Enable container reuse (default)")
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
- help="Disable job or container reuse")
+ help="Disable container reuse")
- parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
- parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
- parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
+ parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
+ parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
+ parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
- help="Ignore Docker image version when deciding whether to reuse past jobs.",
+ help="Ignore Docker image version when deciding whether to reuse past containers.",
default=False)
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
default=True, dest="submit")
- exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+ exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
default=True, dest="submit")
exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
dest="create_workflow")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
- exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
+ exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+ exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
default=True, dest="wait")
- exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+ exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
default=True, dest="wait")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
default=True, dest="log_timestamps")
- parser.add_argument("--api", type=str,
+ parser.add_argument("--api",
default=None, dest="work_api",
- choices=("jobs", "containers"),
- help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
+ choices=("containers",),
+ help="Select work submission API. Only supports 'containers'")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
help="RAM (in MiB) required for the workflow runner job (default 1024)",
default=None)
- parser.add_argument("--submit-runner-image", type=str,
+ parser.add_argument("--submit-runner-image",
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
default=False)
exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--submit-request-uuid", type=str,
+ exgroup.add_argument("--submit-request-uuid",
default=None,
- help="Update and commit to supplied container request instead of creating a new one (containers API only).",
+ help="Update and commit to supplied container request instead of creating a new one.",
metavar="UUID")
- exgroup.add_argument("--submit-runner-cluster", type=str,
- help="Submit workflow runner to a remote cluster (containers API only)",
+ exgroup.add_argument("--submit-runner-cluster",
+ help="Submit workflow runner to a remote cluster",
default=None,
metavar="CLUSTER_ID")
default=None,
help="Collection cache size (in MiB, default 256).")
- parser.add_argument("--name", type=str,
+ parser.add_argument("--name",
help="Name to use for workflow execution instance.",
default=None)
parser.add_argument("--enable-dev", action="store_true",
help="Enable loading and running development versions "
- "of CWL spec.", default=False)
- parser.add_argument('--storage-classes', default="default", type=str,
+ "of the CWL standards.", default=False)
+ parser.add_argument('--storage-classes', default="default",
help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
default=0)
parser.add_argument("--priority", type=int,
- help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+ help="Workflow priority (range 1..1000, higher has precedence over lower)",
default=DEFAULT_PRIORITY)
parser.add_argument("--disable-validate", dest="do_validate",
action="store_false", default=True,
help=argparse.SUPPRESS)
+ parser.add_argument("--disable-color", dest="enable_color",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
parser.add_argument("--disable-js-validation",
action="store_true", default=False,
help=argparse.SUPPRESS)
parser.add_argument("--http-timeout", type=int,
default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
+ parser.add_argument(
+ "--skip-schemas",
+ action="store_true",
+ help="Skip loading of schemas",
+ default=False,
+ dest="skip_schemas",
+ )
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
default=False, dest="trash_intermediate",
help="Do not trash intermediate outputs (default).")
- parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
+ parser.add_argument("workflow", default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
return parser
def add_arv_hints():
cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
- res.close()
+ supported_versions = ["v1.0", "v1.1", "v1.2"]
+ for s in supported_versions:
+ res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-%s.yml' % s)
+ customschema = res.read().decode('utf-8')
+ use_custom_schema(s, "http://arvados.org/cwl", customschema)
+ res.close()
cwltool.process.supportedProcessRequirements.extend([
"http://arvados.org/cwl#RunInSingleContainer",
"http://arvados.org/cwl#OutputDirType",
])
def exit_signal_handler(sigcode, frame):
- logger.error("Caught signal {}, exiting.".format(sigcode))
+ logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
sys.exit(-sigcode)
def main(args, stdout, stderr, api_client=None, keep_client=None,
arvargs = parser.parse_args(args)
if len(arvargs.storage_classes.strip().split(',')) > 1:
- logger.error("Multiple storage classes are not supported currently.")
+ logger.error(str(u"Multiple storage classes are not supported currently."))
return 1
arvargs.use_container = True
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
- elif arvargs.update_workflow.find('-p5p6p-') == 5:
- want_api = 'jobs'
else:
want_api = None
if want_api and arvargs.work_api and want_api != arvargs.work_api:
- logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+ logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
arvargs.update_workflow, want_api, arvargs.work_api))
return 1
arvargs.work_api = want_api
add_arv_hints()
- for key, val in cwltool.argparser.get_default_args().items():
+ for key, val in viewitems(cwltool.argparser.get_default_args()):
if not hasattr(arvargs, key):
setattr(arvargs, key, val)
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
- except Exception as e:
- logger.error(e)
+ except Exception:
+ logger.exception("Error creating the Arvados CWL Executor")
return 1
+ # Note that unless in debug mode, some stack traces related to user
+ # workflow errors may be suppressed.
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+ if stdout is sys.stdout:
+ # cwltool.main has code to work around encoding issues with
+ # sys.stdout and unix pipes (they default to ASCII encoding,
+ # we want utf-8), so when stdout is sys.stdout set it to None
+ # to take advantage of that. Don't override it for all cases
+ # since we still want to be able to capture stdout for the
+ # unit tests.
+ stdout = None
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext)
+ runtimeContext=executor.runtimeContext,
+ input_required=not (arvargs.create_workflow or arvargs.update_workflow))