14440: Fix formatting of cwl-run-options table.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import re
14 import pkg_resources  # part of setuptools
15
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
31
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
35
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 def versionstring():
55     """Print version string of key packages for provenance and debugging."""
56
57     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58     arvpkg = pkg_resources.require("arvados-python-client")
59     cwlpkg = pkg_resources.require("cwltool")
60
61     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62                                     "arvados-python-client", arvpkg[0].version,
63                                     "cwltool", cwlpkg[0].version)
64
65
66 def arg_parser():  # type: () -> argparse.ArgumentParser
67     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
68
69     parser.add_argument("--basedir", type=str,
70                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72                         help="Output directory, default current directory")
73
74     parser.add_argument("--eval-timeout",
75                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
76                         type=float,
77                         default=20)
78
79     exgroup = parser.add_mutually_exclusive_group()
80     exgroup.add_argument("--print-dot", action="store_true",
81                          help="Print workflow visualization in graphviz format and exit")
82     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
84
85     exgroup = parser.add_mutually_exclusive_group()
86     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
89
90     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
91
92     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
93
94     exgroup = parser.add_mutually_exclusive_group()
95     exgroup.add_argument("--enable-reuse", action="store_true",
96                         default=True, dest="enable_reuse",
97                         help="Enable job or container reuse (default)")
98     exgroup.add_argument("--disable-reuse", action="store_false",
99                         default=True, dest="enable_reuse",
100                         help="Disable job or container reuse")
101
102     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
107                         default=False)
108
109     exgroup = parser.add_mutually_exclusive_group()
110     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111                         default=True, dest="submit")
112     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113                         default=True, dest="submit")
114     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115                          dest="create_workflow")
116     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
118
119     exgroup = parser.add_mutually_exclusive_group()
120     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121                         default=True, dest="wait")
122     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123                         default=True, dest="wait")
124
125     exgroup = parser.add_mutually_exclusive_group()
126     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127                         default=True, dest="log_timestamps")
128     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129                         default=True, dest="log_timestamps")
130
131     parser.add_argument("--api", type=str,
132                         default=None, dest="work_api",
133                         choices=("jobs", "containers"),
134                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
135
136     parser.add_argument("--compute-checksum", action="store_true", default=False,
137                         help="Compute checksum of contents while collecting outputs",
138                         dest="compute_checksum")
139
140     parser.add_argument("--submit-runner-ram", type=int,
141                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
142                         default=None)
143
144     parser.add_argument("--submit-runner-image", type=str,
145                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
146                         default=None)
147
148     parser.add_argument("--always-submit-runner", action="store_true",
149                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
150                         default=False)
151
152     exgroup = parser.add_mutually_exclusive_group()
153     exgroup.add_argument("--submit-request-uuid", type=str,
154                          default=None,
155                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
156                          metavar="UUID")
157     exgroup.add_argument("--submit-runner-cluster", type=str,
158                          help="Submit workflow runner to a remote cluster (containers API only)",
159                          default=None,
160                          metavar="CLUSTER_ID")
161
162     parser.add_argument("--name", type=str,
163                         help="Name to use for workflow execution instance.",
164                         default=None)
165
166     parser.add_argument("--on-error",
167                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
168                         "'continue' (may submit other steps that are not downstream from the error). Default is 'stop'.",
169                         default="stop", choices=("stop", "continue"))
170
171     parser.add_argument("--enable-dev", action="store_true",
172                         help="Enable loading and running development versions "
173                              "of CWL spec.", default=False)
174     parser.add_argument('--storage-classes', default="default", type=str,
175                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
176
177     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
178                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
179                         default=0)
180
181     parser.add_argument("--priority", type=int,
182                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
183                         default=DEFAULT_PRIORITY)
184
185     parser.add_argument("--disable-validate", dest="do_validate",
186                         action="store_false", default=True,
187                         help=argparse.SUPPRESS)
188
189     parser.add_argument("--disable-js-validation",
190                         action="store_true", default=False,
191                         help=argparse.SUPPRESS)
192
193     parser.add_argument("--thread-count", type=int,
194                         default=4, help="Number of threads to use for job submit and output collection.")
195
196     parser.add_argument("--http-timeout", type=int,
197                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
198
199     exgroup = parser.add_mutually_exclusive_group()
200     exgroup.add_argument("--trash-intermediate", action="store_true",
201                         default=False, dest="trash_intermediate",
202                          help="Immediately trash intermediate outputs on workflow success.")
203     exgroup.add_argument("--no-trash-intermediate", action="store_false",
204                         default=False, dest="trash_intermediate",
205                         help="Do not trash intermediate outputs (default).")
206
207     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
208     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
209
210     return parser
211
212 def add_arv_hints():
213     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
214     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
215     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
216     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
217     res.close()
218     cwltool.process.supportedProcessRequirements.extend([
219         "http://arvados.org/cwl#RunInSingleContainer",
220         "http://arvados.org/cwl#OutputDirType",
221         "http://arvados.org/cwl#RuntimeConstraints",
222         "http://arvados.org/cwl#PartitionRequirement",
223         "http://arvados.org/cwl#APIRequirement",
224         "http://commonwl.org/cwltool#LoadListingRequirement",
225         "http://arvados.org/cwl#IntermediateOutput",
226         "http://arvados.org/cwl#ReuseRequirement",
227         "http://arvados.org/cwl#ClusterTarget"
228     ])
229
230 def exit_signal_handler(sigcode, frame):
231     logger.error("Caught signal {}, exiting.".format(sigcode))
232     sys.exit(-sigcode)
233
234 def main(args, stdout, stderr, api_client=None, keep_client=None,
235          install_sig_handlers=True):
236     parser = arg_parser()
237
238     job_order_object = None
239     arvargs = parser.parse_args(args)
240
241     if len(arvargs.storage_classes.strip().split(',')) > 1:
242         logger.error("Multiple storage classes are not supported currently.")
243         return 1
244
245     arvargs.use_container = True
246     arvargs.relax_path_checks = True
247     arvargs.print_supported_versions = False
248
249     if install_sig_handlers:
250         arv_cmd.install_signal_handlers()
251
252     if arvargs.update_workflow:
253         if arvargs.update_workflow.find('-7fd4e-') == 5:
254             want_api = 'containers'
255         elif arvargs.update_workflow.find('-p5p6p-') == 5:
256             want_api = 'jobs'
257         else:
258             want_api = None
259         if want_api and arvargs.work_api and want_api != arvargs.work_api:
260             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
261                 arvargs.update_workflow, want_api, arvargs.work_api))
262             return 1
263         arvargs.work_api = want_api
264
265     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
266         job_order_object = ({}, "")
267
268     add_arv_hints()
269
270     for key, val in cwltool.argparser.get_default_args().items():
271         if not hasattr(arvargs, key):
272             setattr(arvargs, key, val)
273
274     try:
275         if api_client is None:
276             api_client = arvados.safeapi.ThreadSafeApiCache(
277                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
278                 keep_params={"num_retries": 4})
279             keep_client = api_client.keep
280             # Make an API object now so errors are reported early.
281             api_client.users().current().execute()
282         if keep_client is None:
283             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
284         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
285     except Exception as e:
286         logger.error(e)
287         return 1
288
289     if arvargs.debug:
290         logger.setLevel(logging.DEBUG)
291         logging.getLogger('arvados').setLevel(logging.DEBUG)
292
293     if arvargs.quiet:
294         logger.setLevel(logging.WARN)
295         logging.getLogger('arvados').setLevel(logging.WARN)
296         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
297
298     if arvargs.metrics:
299         metrics.setLevel(logging.DEBUG)
300         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
301
302     if arvargs.log_timestamps:
303         arvados.log_handler.setFormatter(logging.Formatter(
304             '%(asctime)s %(name)s %(levelname)s: %(message)s',
305             '%Y-%m-%d %H:%M:%S'))
306     else:
307         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
308
309     return cwltool.main.main(args=arvargs,
310                              stdout=stdout,
311                              stderr=stderr,
312                              executor=executor.arv_executor,
313                              versionfunc=versionstring,
314                              job_order_object=job_order_object,
315                              logger_handler=arvados.log_handler,
316                              custom_schema_callback=add_arv_hints,
317                              loadingContext=executor.loadingContext,
318                              runtimeContext=executor.runtimeContext)