2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # the Crunch containers API.
9 from future.utils import viewitems
10 from builtins import str
17 import pkg_resources # part of setuptools
19 ### begin monkey patch ###
20 # Monkey patch solution for bug #16169
22 # There is a bug in upstream cwltool where the version updater needs
23 # to replace the document fragments in the loader index with the
24 # updated ones, but actually it only does it for the root document.
25 # Normally we just fix the bug in upstream but that's challenging
26 # because current cwltool dropped support for Python 2.7 and we're
27 # still supporting py2 in Arvados 2.0 (although py2 support will most
28 # likely be dropped in Arvados 2.1). Making a bugfix fork comes with
29 # its own complications (it would need to be added to PyPi) so monkey
30 # patching is the least disruptive fix (and is relatively safe because
31 # our cwltool dependency is pinned to a specific version). This
32 # should be removed as soon as a bugfix goes into upstream cwltool and
35 import cwltool.load_tool
36 from cwltool.utils import visit_class
37 from six.moves import urllib
38 original_resolve_and_validate_document = cwltool.load_tool.resolve_and_validate_document
39 def wrapped_resolve_and_validate_document(
40 loadingContext, # type: LoadingContext
41 workflowobj, # type: Union[CommentedMap, CommentedSeq]
43 preprocess_only=False, # type: bool
44 skip_schemas=None, # type: Optional[bool]
46 loadingContext, uri = original_resolve_and_validate_document(loadingContext, workflowobj, uri, preprocess_only, skip_schemas)
47 if loadingContext.do_update in (True, None):
48 fileuri = urllib.parse.urldefrag(uri)[0]
50 loadingContext.loader.idx[pr["id"]] = pr
51 visit_class(loadingContext.loader.idx[fileuri], ("CommandLineTool", "Workflow", "ExpressionTool"), update_index)
52 return loadingContext, uri
53 cwltool.load_tool.resolve_and_validate_document = wrapped_resolve_and_validate_document
54 ### end monkey patch ###
56 from schema_salad.sourceline import SourceLine
57 import schema_salad.validate as validate
59 import cwltool.workflow
60 import cwltool.process
61 import cwltool.argparser
62 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
63 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
67 from arvados.keep import KeepClient
68 from arvados.errors import ApiError
69 import arvados.commands._util as arv_cmd
70 from arvados.api import OrderedJsonModel
72 from .perf import Perf
73 from ._version import __version__
74 from .executor import ArvCwlExecutor
76 # These aren't used directly in this file but
77 # other code expects to import them from here
78 from .arvcontainer import ArvadosContainer
79 from .arvtool import ArvadosCommandTool
80 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
81 from .util import get_current_container
82 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
83 from .arvworkflow import ArvadosWorkflow
85 logger = logging.getLogger('arvados.cwl-runner')
86 metrics = logging.getLogger('arvados.cwl-runner.metrics')
87 logger.setLevel(logging.INFO)
89 arvados.log_handler.setFormatter(logging.Formatter(
90 '%(asctime)s %(name)s %(levelname)s: %(message)s',
94 """Print version string of key packages for provenance and debugging."""
96 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
97 arvpkg = pkg_resources.require("arvados-python-client")
98 cwlpkg = pkg_resources.require("cwltool")
100 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
101 "arvados-python-client", arvpkg[0].version,
102 "cwltool", cwlpkg[0].version)
105 def arg_parser(): # type: () -> argparse.ArgumentParser
106 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
108 parser.add_argument("--basedir",
109 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
110 parser.add_argument("--outdir", default=os.path.abspath('.'),
111 help="Output directory, default current directory")
113 parser.add_argument("--eval-timeout",
114 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
118 exgroup = parser.add_mutually_exclusive_group()
119 exgroup.add_argument("--print-dot", action="store_true",
120 help="Print workflow visualization in graphviz format and exit")
121 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
122 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
124 exgroup = parser.add_mutually_exclusive_group()
125 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
126 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
127 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
129 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
131 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
133 exgroup = parser.add_mutually_exclusive_group()
134 exgroup.add_argument("--enable-reuse", action="store_true",
135 default=True, dest="enable_reuse",
136 help="Enable container reuse (default)")
137 exgroup.add_argument("--disable-reuse", action="store_false",
138 default=True, dest="enable_reuse",
139 help="Disable container reuse")
141 parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
142 parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
143 parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
144 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
145 help="Ignore Docker image version when deciding whether to reuse past containers.",
148 exgroup = parser.add_mutually_exclusive_group()
149 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
150 default=True, dest="submit")
151 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
152 default=True, dest="submit")
153 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
154 dest="create_workflow")
155 exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
156 exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
158 exgroup = parser.add_mutually_exclusive_group()
159 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
160 default=True, dest="wait")
161 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
162 default=True, dest="wait")
164 exgroup = parser.add_mutually_exclusive_group()
165 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
166 default=True, dest="log_timestamps")
167 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
168 default=True, dest="log_timestamps")
170 parser.add_argument("--api",
171 default=None, dest="work_api",
172 choices=("containers",),
173 help="Select work submission API. Only supports 'containers'")
175 parser.add_argument("--compute-checksum", action="store_true", default=False,
176 help="Compute checksum of contents while collecting outputs",
177 dest="compute_checksum")
179 parser.add_argument("--submit-runner-ram", type=int,
180 help="RAM (in MiB) required for the workflow runner job (default 1024)",
183 parser.add_argument("--submit-runner-image",
184 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
187 parser.add_argument("--always-submit-runner", action="store_true",
188 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
191 exgroup = parser.add_mutually_exclusive_group()
192 exgroup.add_argument("--submit-request-uuid",
194 help="Update and commit to supplied container request instead of creating a new one.",
196 exgroup.add_argument("--submit-runner-cluster",
197 help="Submit workflow runner to a remote cluster",
199 metavar="CLUSTER_ID")
201 parser.add_argument("--collection-cache-size", type=int,
203 help="Collection cache size (in MiB, default 256).")
205 parser.add_argument("--name",
206 help="Name to use for workflow execution instance.",
209 parser.add_argument("--on-error",
210 help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
211 "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
212 default="continue", choices=("stop", "continue"))
214 parser.add_argument("--enable-dev", action="store_true",
215 help="Enable loading and running development versions "
216 "of CWL spec.", default=False)
217 parser.add_argument('--storage-classes', default="default",
218 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
220 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
221 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
224 parser.add_argument("--priority", type=int,
225 help="Workflow priority (range 1..1000, higher has precedence over lower)",
226 default=DEFAULT_PRIORITY)
228 parser.add_argument("--disable-validate", dest="do_validate",
229 action="store_false", default=True,
230 help=argparse.SUPPRESS)
232 parser.add_argument("--disable-js-validation",
233 action="store_true", default=False,
234 help=argparse.SUPPRESS)
236 parser.add_argument("--thread-count", type=int,
237 default=1, help="Number of threads to use for job submit and output collection.")
239 parser.add_argument("--http-timeout", type=int,
240 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
242 exgroup = parser.add_mutually_exclusive_group()
243 exgroup.add_argument("--trash-intermediate", action="store_true",
244 default=False, dest="trash_intermediate",
245 help="Immediately trash intermediate outputs on workflow success.")
246 exgroup.add_argument("--no-trash-intermediate", action="store_false",
247 default=False, dest="trash_intermediate",
248 help="Do not trash intermediate outputs (default).")
250 parser.add_argument("workflow", default=None, help="The workflow to execute")
251 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
256 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
257 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
258 res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
259 res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
260 customschema10 = res10.read()
261 customschema11 = res11.read()
262 use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
263 use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
264 use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
267 cwltool.process.supportedProcessRequirements.extend([
268 "http://arvados.org/cwl#RunInSingleContainer",
269 "http://arvados.org/cwl#OutputDirType",
270 "http://arvados.org/cwl#RuntimeConstraints",
271 "http://arvados.org/cwl#PartitionRequirement",
272 "http://arvados.org/cwl#APIRequirement",
273 "http://commonwl.org/cwltool#LoadListingRequirement",
274 "http://arvados.org/cwl#IntermediateOutput",
275 "http://arvados.org/cwl#ReuseRequirement",
276 "http://arvados.org/cwl#ClusterTarget"
279 def exit_signal_handler(sigcode, frame):
280 logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
283 def main(args, stdout, stderr, api_client=None, keep_client=None,
284 install_sig_handlers=True):
285 parser = arg_parser()
287 job_order_object = None
288 arvargs = parser.parse_args(args)
290 if len(arvargs.storage_classes.strip().split(',')) > 1:
291 logger.error(str(u"Multiple storage classes are not supported currently."))
294 arvargs.use_container = True
295 arvargs.relax_path_checks = True
296 arvargs.print_supported_versions = False
298 if install_sig_handlers:
299 arv_cmd.install_signal_handlers()
301 if arvargs.update_workflow:
302 if arvargs.update_workflow.find('-7fd4e-') == 5:
303 want_api = 'containers'
306 if want_api and arvargs.work_api and want_api != arvargs.work_api:
307 logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
308 arvargs.update_workflow, want_api, arvargs.work_api))
310 arvargs.work_api = want_api
312 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
313 job_order_object = ({}, "")
317 for key, val in viewitems(cwltool.argparser.get_default_args()):
318 if not hasattr(arvargs, key):
319 setattr(arvargs, key, val)
322 if api_client is None:
323 api_client = arvados.safeapi.ThreadSafeApiCache(
324 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
325 keep_params={"num_retries": 4})
326 keep_client = api_client.keep
327 # Make an API object now so errors are reported early.
328 api_client.users().current().execute()
329 if keep_client is None:
330 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
331 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
333 logger.exception("Error creating the Arvados CWL Executor")
336 # Note that unless in debug mode, some stack traces related to user
337 # workflow errors may be suppressed.
339 logger.setLevel(logging.DEBUG)
340 logging.getLogger('arvados').setLevel(logging.DEBUG)
343 logger.setLevel(logging.WARN)
344 logging.getLogger('arvados').setLevel(logging.WARN)
345 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
348 metrics.setLevel(logging.DEBUG)
349 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
351 if arvargs.log_timestamps:
352 arvados.log_handler.setFormatter(logging.Formatter(
353 '%(asctime)s %(name)s %(levelname)s: %(message)s',
354 '%Y-%m-%d %H:%M:%S'))
356 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
358 if stdout is sys.stdout:
359 # cwltool.main has code to work around encoding issues with
360 # sys.stdout and unix pipes (they default to ASCII encoding,
361 # we want utf-8), so when stdout is sys.stdout set it to None
362 # to take advantage of that. Don't override it for all cases
363 # since we still want to be able to capture stdout for the
367 return cwltool.main.main(args=arvargs,
370 executor=executor.arv_executor,
371 versionfunc=versionstring,
372 job_order_object=job_order_object,
373 logger_handler=arvados.log_handler,
374 custom_schema_callback=add_arv_hints,
375 loadingContext=executor.loadingContext,
376 runtimeContext=executor.runtimeContext,
377 input_required=not (arvargs.create_workflow or arvargs.update_workflow))