Merge branch '16169-cwl-hints' refs #16169
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # the Crunch containers API.
8
9 from future.utils import viewitems
10 from builtins import str
11
12 import argparse
13 import logging
14 import os
15 import sys
16 import re
17 import pkg_resources  # part of setuptools
18
19 ### begin monkey patch ###
20 # Monkey patch solution for bug #16169
21 #
22 # There is a bug in upstream cwltool where the version updater needs
23 # to replace the document fragments in the loader index with the
24 # updated ones, but actually it only does it for the root document.
25 # Normally we just fix the bug in upstream but that's challenging
26 # because current cwltool dropped support for Python 2.7 and we're
27 # still supporting py2 in Arvados 2.0 (although py2 support will most
28 # likely be dropped in Arvados 2.1).  Making a bugfix fork comes with
29 # its own complications (it would need to be added to PyPi) so monkey
30 # patching is the least disruptive fix (and is relatively safe because
31 # our cwltool dependency is pinned to a specific version).  This
32 # should be removed as soon as a bugfix goes into upstream cwltool and
33 # we upgrade to it.
34 #
35 import cwltool.load_tool
36 from cwltool.utils import visit_class
37 from six.moves import urllib
38 original_resolve_and_validate_document = cwltool.load_tool.resolve_and_validate_document
39 def wrapped_resolve_and_validate_document(
40         loadingContext,            # type: LoadingContext
41         workflowobj,               # type: Union[CommentedMap, CommentedSeq]
42         uri,                       # type: Text
43         preprocess_only=False,     # type: bool
44         skip_schemas=None,         # type: Optional[bool]
45         ):
46     loadingContext, uri = original_resolve_and_validate_document(loadingContext, workflowobj, uri, preprocess_only, skip_schemas)
47     if loadingContext.do_update in (True, None):
48         fileuri = urllib.parse.urldefrag(uri)[0]
49         def update_index(pr):
50             loadingContext.loader.idx[pr["id"]] = pr
51         visit_class(loadingContext.loader.idx[fileuri], ("CommandLineTool", "Workflow", "ExpressionTool"), update_index)
52     return loadingContext, uri
53 cwltool.load_tool.resolve_and_validate_document = wrapped_resolve_and_validate_document
54 ### end monkey patch ###
55
56 from schema_salad.sourceline import SourceLine
57 import schema_salad.validate as validate
58 import cwltool.main
59 import cwltool.workflow
60 import cwltool.process
61 import cwltool.argparser
62 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
63 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
64
65 import arvados
66 import arvados.config
67 from arvados.keep import KeepClient
68 from arvados.errors import ApiError
69 import arvados.commands._util as arv_cmd
70 from arvados.api import OrderedJsonModel
71
72 from .perf import Perf
73 from ._version import __version__
74 from .executor import ArvCwlExecutor
75
76 # These aren't used directly in this file but
77 # other code expects to import them from here
78 from .arvcontainer import ArvadosContainer
79 from .arvtool import ArvadosCommandTool
80 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
81 from .util import get_current_container
82 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
83 from .arvworkflow import ArvadosWorkflow
84
85 logger = logging.getLogger('arvados.cwl-runner')
86 metrics = logging.getLogger('arvados.cwl-runner.metrics')
87 logger.setLevel(logging.INFO)
88
89 arvados.log_handler.setFormatter(logging.Formatter(
90         '%(asctime)s %(name)s %(levelname)s: %(message)s',
91         '%Y-%m-%d %H:%M:%S'))
92
93 def versionstring():
94     """Print version string of key packages for provenance and debugging."""
95
96     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
97     arvpkg = pkg_resources.require("arvados-python-client")
98     cwlpkg = pkg_resources.require("cwltool")
99
100     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
101                                     "arvados-python-client", arvpkg[0].version,
102                                     "cwltool", cwlpkg[0].version)
103
104
105 def arg_parser():  # type: () -> argparse.ArgumentParser
106     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
107
108     parser.add_argument("--basedir",
109                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
110     parser.add_argument("--outdir", default=os.path.abspath('.'),
111                         help="Output directory, default current directory")
112
113     parser.add_argument("--eval-timeout",
114                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
115                         type=float,
116                         default=20)
117
118     exgroup = parser.add_mutually_exclusive_group()
119     exgroup.add_argument("--print-dot", action="store_true",
120                          help="Print workflow visualization in graphviz format and exit")
121     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
122     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
123
124     exgroup = parser.add_mutually_exclusive_group()
125     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
126     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
127     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
128
129     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
130
131     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
132
133     exgroup = parser.add_mutually_exclusive_group()
134     exgroup.add_argument("--enable-reuse", action="store_true",
135                         default=True, dest="enable_reuse",
136                         help="Enable container reuse (default)")
137     exgroup.add_argument("--disable-reuse", action="store_false",
138                         default=True, dest="enable_reuse",
139                         help="Disable container reuse")
140
141     parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
142     parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
143     parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
144     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
145                         help="Ignore Docker image version when deciding whether to reuse past containers.",
146                         default=False)
147
148     exgroup = parser.add_mutually_exclusive_group()
149     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
150                         default=True, dest="submit")
151     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
152                         default=True, dest="submit")
153     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
154                          dest="create_workflow")
155     exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
156     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
157
158     exgroup = parser.add_mutually_exclusive_group()
159     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
160                         default=True, dest="wait")
161     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
162                         default=True, dest="wait")
163
164     exgroup = parser.add_mutually_exclusive_group()
165     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
166                         default=True, dest="log_timestamps")
167     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
168                         default=True, dest="log_timestamps")
169
170     parser.add_argument("--api",
171                         default=None, dest="work_api",
172                         choices=("containers",),
173                         help="Select work submission API.  Only supports 'containers'")
174
175     parser.add_argument("--compute-checksum", action="store_true", default=False,
176                         help="Compute checksum of contents while collecting outputs",
177                         dest="compute_checksum")
178
179     parser.add_argument("--submit-runner-ram", type=int,
180                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
181                         default=None)
182
183     parser.add_argument("--submit-runner-image",
184                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
185                         default=None)
186
187     parser.add_argument("--always-submit-runner", action="store_true",
188                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
189                         default=False)
190
191     exgroup = parser.add_mutually_exclusive_group()
192     exgroup.add_argument("--submit-request-uuid",
193                          default=None,
194                          help="Update and commit to supplied container request instead of creating a new one.",
195                          metavar="UUID")
196     exgroup.add_argument("--submit-runner-cluster",
197                          help="Submit workflow runner to a remote cluster",
198                          default=None,
199                          metavar="CLUSTER_ID")
200
201     parser.add_argument("--collection-cache-size", type=int,
202                         default=None,
203                         help="Collection cache size (in MiB, default 256).")
204
205     parser.add_argument("--name",
206                         help="Name to use for workflow execution instance.",
207                         default=None)
208
209     parser.add_argument("--on-error",
210                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
211                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
212                         default="continue", choices=("stop", "continue"))
213
214     parser.add_argument("--enable-dev", action="store_true",
215                         help="Enable loading and running development versions "
216                              "of CWL spec.", default=False)
217     parser.add_argument('--storage-classes', default="default",
218                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
219
220     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
221                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
222                         default=0)
223
224     parser.add_argument("--priority", type=int,
225                         help="Workflow priority (range 1..1000, higher has precedence over lower)",
226                         default=DEFAULT_PRIORITY)
227
228     parser.add_argument("--disable-validate", dest="do_validate",
229                         action="store_false", default=True,
230                         help=argparse.SUPPRESS)
231
232     parser.add_argument("--disable-js-validation",
233                         action="store_true", default=False,
234                         help=argparse.SUPPRESS)
235
236     parser.add_argument("--thread-count", type=int,
237                         default=1, help="Number of threads to use for job submit and output collection.")
238
239     parser.add_argument("--http-timeout", type=int,
240                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
241
242     exgroup = parser.add_mutually_exclusive_group()
243     exgroup.add_argument("--trash-intermediate", action="store_true",
244                         default=False, dest="trash_intermediate",
245                          help="Immediately trash intermediate outputs on workflow success.")
246     exgroup.add_argument("--no-trash-intermediate", action="store_false",
247                         default=False, dest="trash_intermediate",
248                         help="Do not trash intermediate outputs (default).")
249
250     parser.add_argument("workflow", default=None, help="The workflow to execute")
251     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
252
253     return parser
254
255 def add_arv_hints():
256     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
257     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
258     res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
259     res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
260     customschema10 = res10.read()
261     customschema11 = res11.read()
262     use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
263     use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
264     use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
265     res10.close()
266     res11.close()
267     cwltool.process.supportedProcessRequirements.extend([
268         "http://arvados.org/cwl#RunInSingleContainer",
269         "http://arvados.org/cwl#OutputDirType",
270         "http://arvados.org/cwl#RuntimeConstraints",
271         "http://arvados.org/cwl#PartitionRequirement",
272         "http://arvados.org/cwl#APIRequirement",
273         "http://commonwl.org/cwltool#LoadListingRequirement",
274         "http://arvados.org/cwl#IntermediateOutput",
275         "http://arvados.org/cwl#ReuseRequirement",
276         "http://arvados.org/cwl#ClusterTarget"
277     ])
278
279 def exit_signal_handler(sigcode, frame):
280     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
281     sys.exit(-sigcode)
282
283 def main(args, stdout, stderr, api_client=None, keep_client=None,
284          install_sig_handlers=True):
285     parser = arg_parser()
286
287     job_order_object = None
288     arvargs = parser.parse_args(args)
289
290     if len(arvargs.storage_classes.strip().split(',')) > 1:
291         logger.error(str(u"Multiple storage classes are not supported currently."))
292         return 1
293
294     arvargs.use_container = True
295     arvargs.relax_path_checks = True
296     arvargs.print_supported_versions = False
297
298     if install_sig_handlers:
299         arv_cmd.install_signal_handlers()
300
301     if arvargs.update_workflow:
302         if arvargs.update_workflow.find('-7fd4e-') == 5:
303             want_api = 'containers'
304         else:
305             want_api = None
306         if want_api and arvargs.work_api and want_api != arvargs.work_api:
307             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
308                 arvargs.update_workflow, want_api, arvargs.work_api))
309             return 1
310         arvargs.work_api = want_api
311
312     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
313         job_order_object = ({}, "")
314
315     add_arv_hints()
316
317     for key, val in viewitems(cwltool.argparser.get_default_args()):
318         if not hasattr(arvargs, key):
319             setattr(arvargs, key, val)
320
321     try:
322         if api_client is None:
323             api_client = arvados.safeapi.ThreadSafeApiCache(
324                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
325                 keep_params={"num_retries": 4})
326             keep_client = api_client.keep
327             # Make an API object now so errors are reported early.
328             api_client.users().current().execute()
329         if keep_client is None:
330             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
331         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
332     except Exception:
333         logger.exception("Error creating the Arvados CWL Executor")
334         return 1
335
336     # Note that unless in debug mode, some stack traces related to user
337     # workflow errors may be suppressed.
338     if arvargs.debug:
339         logger.setLevel(logging.DEBUG)
340         logging.getLogger('arvados').setLevel(logging.DEBUG)
341
342     if arvargs.quiet:
343         logger.setLevel(logging.WARN)
344         logging.getLogger('arvados').setLevel(logging.WARN)
345         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
346
347     if arvargs.metrics:
348         metrics.setLevel(logging.DEBUG)
349         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
350
351     if arvargs.log_timestamps:
352         arvados.log_handler.setFormatter(logging.Formatter(
353             '%(asctime)s %(name)s %(levelname)s: %(message)s',
354             '%Y-%m-%d %H:%M:%S'))
355     else:
356         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
357
358     if stdout is sys.stdout:
359         # cwltool.main has code to work around encoding issues with
360         # sys.stdout and unix pipes (they default to ASCII encoding,
361         # we want utf-8), so when stdout is sys.stdout set it to None
362         # to take advantage of that.  Don't override it for all cases
363         # since we still want to be able to capture stdout for the
364         # unit tests.
365         stdout = None
366
367     return cwltool.main.main(args=arvargs,
368                              stdout=stdout,
369                              stderr=stderr,
370                              executor=executor.arv_executor,
371                              versionfunc=versionstring,
372                              job_order_object=job_order_object,
373                              logger_handler=arvados.log_handler,
374                              custom_schema_callback=add_arv_hints,
375                              loadingContext=executor.loadingContext,
376                              runtimeContext=executor.runtimeContext,
377                              input_required=not (arvargs.create_workflow or arvargs.update_workflow))