15028: Bump cwltool to accept v1.1
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 from future.utils import viewitems
10 from builtins import str
11
12 import argparse
13 import logging
14 import os
15 import sys
16 import re
17 import pkg_resources  # part of setuptools
18
19 from schema_salad.sourceline import SourceLine
20 import schema_salad.validate as validate
21 import cwltool.main
22 import cwltool.workflow
23 import cwltool.process
24 import cwltool.argparser
25 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
26 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32 import arvados.commands._util as arv_cmd
33 from arvados.api import OrderedJsonModel
34
35 from .perf import Perf
36 from ._version import __version__
37 from .executor import ArvCwlExecutor
38
39 # These arn't used directly in this file but
40 # other code expects to import them from here
41 from .arvcontainer import ArvadosContainer
42 from .arvjob import ArvadosJob
43 from .arvtool import ArvadosCommandTool
44 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
45 from .util import get_current_container
46 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
47 from .arvworkflow import ArvadosWorkflow
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 def versionstring():
58     """Print version string of key packages for provenance and debugging."""
59
60     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
61     arvpkg = pkg_resources.require("arvados-python-client")
62     cwlpkg = pkg_resources.require("cwltool")
63
64     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
65                                     "arvados-python-client", arvpkg[0].version,
66                                     "cwltool", cwlpkg[0].version)
67
68
69 def arg_parser():  # type: () -> argparse.ArgumentParser
70     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
71
72     parser.add_argument("--basedir",
73                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
74     parser.add_argument("--outdir", default=os.path.abspath('.'),
75                         help="Output directory, default current directory")
76
77     parser.add_argument("--eval-timeout",
78                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79                         type=float,
80                         default=20)
81
82     exgroup = parser.add_mutually_exclusive_group()
83     exgroup.add_argument("--print-dot", action="store_true",
84                          help="Print workflow visualization in graphviz format and exit")
85     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
86     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
87
88     exgroup = parser.add_mutually_exclusive_group()
89     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
90     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
91     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
92
93     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
94
95     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
96
97     exgroup = parser.add_mutually_exclusive_group()
98     exgroup.add_argument("--enable-reuse", action="store_true",
99                         default=True, dest="enable_reuse",
100                         help="Enable job or container reuse (default)")
101     exgroup.add_argument("--disable-reuse", action="store_false",
102                         default=True, dest="enable_reuse",
103                         help="Disable job or container reuse")
104
105     parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
106     parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
107     parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
108     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
109                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
110                         default=False)
111
112     exgroup = parser.add_mutually_exclusive_group()
113     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
114                         default=True, dest="submit")
115     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
116                         default=True, dest="submit")
117     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
118                          dest="create_workflow")
119     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
120     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
121
122     exgroup = parser.add_mutually_exclusive_group()
123     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
124                         default=True, dest="wait")
125     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
126                         default=True, dest="wait")
127
128     exgroup = parser.add_mutually_exclusive_group()
129     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
130                         default=True, dest="log_timestamps")
131     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
132                         default=True, dest="log_timestamps")
133
134     parser.add_argument("--api",
135                         default=None, dest="work_api",
136                         choices=("jobs", "containers"),
137                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
138
139     parser.add_argument("--compute-checksum", action="store_true", default=False,
140                         help="Compute checksum of contents while collecting outputs",
141                         dest="compute_checksum")
142
143     parser.add_argument("--submit-runner-ram", type=int,
144                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
145                         default=None)
146
147     parser.add_argument("--submit-runner-image",
148                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
149                         default=None)
150
151     parser.add_argument("--always-submit-runner", action="store_true",
152                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
153                         default=False)
154
155     exgroup = parser.add_mutually_exclusive_group()
156     exgroup.add_argument("--submit-request-uuid",
157                          default=None,
158                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
159                          metavar="UUID")
160     exgroup.add_argument("--submit-runner-cluster",
161                          help="Submit workflow runner to a remote cluster (containers API only)",
162                          default=None,
163                          metavar="CLUSTER_ID")
164
165     parser.add_argument("--collection-cache-size", type=int,
166                         default=None,
167                         help="Collection cache size (in MiB, default 256).")
168
169     parser.add_argument("--name",
170                         help="Name to use for workflow execution instance.",
171                         default=None)
172
173     parser.add_argument("--on-error",
174                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
175                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
176                         default="continue", choices=("stop", "continue"))
177
178     parser.add_argument("--enable-dev", action="store_true",
179                         help="Enable loading and running development versions "
180                              "of CWL spec.", default=False)
181     parser.add_argument('--storage-classes', default="default",
182                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
183
184     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
185                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
186                         default=0)
187
188     parser.add_argument("--priority", type=int,
189                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
190                         default=DEFAULT_PRIORITY)
191
192     parser.add_argument("--disable-validate", dest="do_validate",
193                         action="store_false", default=True,
194                         help=argparse.SUPPRESS)
195
196     parser.add_argument("--disable-js-validation",
197                         action="store_true", default=False,
198                         help=argparse.SUPPRESS)
199
200     parser.add_argument("--thread-count", type=int,
201                         default=1, help="Number of threads to use for job submit and output collection.")
202
203     parser.add_argument("--http-timeout", type=int,
204                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
205
206     exgroup = parser.add_mutually_exclusive_group()
207     exgroup.add_argument("--trash-intermediate", action="store_true",
208                         default=False, dest="trash_intermediate",
209                          help="Immediately trash intermediate outputs on workflow success.")
210     exgroup.add_argument("--no-trash-intermediate", action="store_false",
211                         default=False, dest="trash_intermediate",
212                         help="Do not trash intermediate outputs (default).")
213
214     parser.add_argument("workflow", default=None, help="The workflow to execute")
215     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
216
217     return parser
218
219 def add_arv_hints():
220     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
221     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
222     res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
223     res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
224     customschema10 = res10.read()
225     customschema11 = res11.read()
226     use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
227     use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
228     use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
229     res10.close()
230     res11.close()
231     cwltool.process.supportedProcessRequirements.extend([
232         "http://arvados.org/cwl#RunInSingleContainer",
233         "http://arvados.org/cwl#OutputDirType",
234         "http://arvados.org/cwl#RuntimeConstraints",
235         "http://arvados.org/cwl#PartitionRequirement",
236         "http://arvados.org/cwl#APIRequirement",
237         "http://commonwl.org/cwltool#LoadListingRequirement",
238         "http://arvados.org/cwl#IntermediateOutput",
239         "http://arvados.org/cwl#ReuseRequirement",
240         "http://arvados.org/cwl#ClusterTarget"
241     ])
242
243 def exit_signal_handler(sigcode, frame):
244     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
245     sys.exit(-sigcode)
246
247 def main(args, stdout, stderr, api_client=None, keep_client=None,
248          install_sig_handlers=True):
249     parser = arg_parser()
250
251     job_order_object = None
252     arvargs = parser.parse_args(args)
253
254     if len(arvargs.storage_classes.strip().split(',')) > 1:
255         logger.error(str(u"Multiple storage classes are not supported currently."))
256         return 1
257
258     arvargs.use_container = True
259     arvargs.relax_path_checks = True
260     arvargs.print_supported_versions = False
261
262     if install_sig_handlers:
263         arv_cmd.install_signal_handlers()
264
265     if arvargs.update_workflow:
266         if arvargs.update_workflow.find('-7fd4e-') == 5:
267             want_api = 'containers'
268         elif arvargs.update_workflow.find('-p5p6p-') == 5:
269             want_api = 'jobs'
270         else:
271             want_api = None
272         if want_api and arvargs.work_api and want_api != arvargs.work_api:
273             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
274                 arvargs.update_workflow, want_api, arvargs.work_api))
275             return 1
276         arvargs.work_api = want_api
277
278     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
279         job_order_object = ({}, "")
280
281     add_arv_hints()
282
283     for key, val in viewitems(cwltool.argparser.get_default_args()):
284         if not hasattr(arvargs, key):
285             setattr(arvargs, key, val)
286
287     try:
288         if api_client is None:
289             api_client = arvados.safeapi.ThreadSafeApiCache(
290                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
291                 keep_params={"num_retries": 4})
292             keep_client = api_client.keep
293             # Make an API object now so errors are reported early.
294             api_client.users().current().execute()
295         if keep_client is None:
296             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
297         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
298     except Exception:
299         logger.exception("Error creating the Arvados CWL Executor")
300         return 1
301
302     # Note that unless in debug mode, some stack traces related to user
303     # workflow errors may be suppressed. See ArvadosJob.done().
304     if arvargs.debug:
305         logger.setLevel(logging.DEBUG)
306         logging.getLogger('arvados').setLevel(logging.DEBUG)
307
308     if arvargs.quiet:
309         logger.setLevel(logging.WARN)
310         logging.getLogger('arvados').setLevel(logging.WARN)
311         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
312
313     if arvargs.metrics:
314         metrics.setLevel(logging.DEBUG)
315         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
316
317     if arvargs.log_timestamps:
318         arvados.log_handler.setFormatter(logging.Formatter(
319             '%(asctime)s %(name)s %(levelname)s: %(message)s',
320             '%Y-%m-%d %H:%M:%S'))
321     else:
322         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
323
324     if stdout is sys.stdout:
325         # cwltool.main has code to work around encoding issues with
326         # sys.stdout and unix pipes (they default to ASCII encoding,
327         # we want utf-8), so when stdout is sys.stdout set it to None
328         # to take advantage of that.  Don't override it for all cases
329         # since we still want to be able to capture stdout for the
330         # unit tests.
331         stdout = None
332
333     return cwltool.main.main(args=arvargs,
334                              stdout=stdout,
335                              stderr=stderr,
336                              executor=executor.arv_executor,
337                              versionfunc=versionstring,
338                              job_order_object=job_order_object,
339                              logger_handler=arvados.log_handler,
340                              custom_schema_callback=add_arv_hints,
341                              loadingContext=executor.loadingContext,
342                              runtimeContext=executor.runtimeContext)