15028: More conformance fixes
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 from future.utils import viewitems
10 from builtins import str
11
12 import argparse
13 import logging
14 import os
15 import sys
16 import re
17 import pkg_resources  # part of setuptools
18
19 from schema_salad.sourceline import SourceLine
20 import schema_salad.validate as validate
21 import cwltool.main
22 import cwltool.workflow
23 import cwltool.process
24 import cwltool.argparser
25 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
26 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32 import arvados.commands._util as arv_cmd
33 from arvados.api import OrderedJsonModel
34
35 from .perf import Perf
36 from ._version import __version__
37 from .executor import ArvCwlExecutor
38
39 # These arn't used directly in this file but
40 # other code expects to import them from here
41 from .arvcontainer import ArvadosContainer
42 from .arvjob import ArvadosJob
43 from .arvtool import ArvadosCommandTool
44 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
45 from .util import get_current_container
46 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
47 from .arvworkflow import ArvadosWorkflow
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 def versionstring():
58     """Print version string of key packages for provenance and debugging."""
59
60     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
61     arvpkg = pkg_resources.require("arvados-python-client")
62     cwlpkg = pkg_resources.require("cwltool")
63
64     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
65                                     "arvados-python-client", arvpkg[0].version,
66                                     "cwltool", cwlpkg[0].version)
67
68
69 def arg_parser():  # type: () -> argparse.ArgumentParser
70     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
71
72     parser.add_argument("--basedir",
73                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
74     parser.add_argument("--outdir", default=os.path.abspath('.'),
75                         help="Output directory, default current directory")
76
77     parser.add_argument("--eval-timeout",
78                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79                         type=float,
80                         default=20)
81
82     exgroup = parser.add_mutually_exclusive_group()
83     exgroup.add_argument("--print-dot", action="store_true",
84                          help="Print workflow visualization in graphviz format and exit")
85     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
86     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
87
88     exgroup = parser.add_mutually_exclusive_group()
89     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
90     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
91     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
92
93     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
94
95     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
96
97     exgroup = parser.add_mutually_exclusive_group()
98     exgroup.add_argument("--enable-reuse", action="store_true",
99                         default=True, dest="enable_reuse",
100                         help="Enable job or container reuse (default)")
101     exgroup.add_argument("--disable-reuse", action="store_false",
102                         default=True, dest="enable_reuse",
103                         help="Disable job or container reuse")
104
105     parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
106     parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
107     parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
108     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
109                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
110                         default=False)
111
112     exgroup = parser.add_mutually_exclusive_group()
113     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
114                         default=True, dest="submit")
115     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
116                         default=True, dest="submit")
117     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
118                          dest="create_workflow")
119     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
120     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
121
122     exgroup = parser.add_mutually_exclusive_group()
123     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
124                         default=True, dest="wait")
125     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
126                         default=True, dest="wait")
127
128     exgroup = parser.add_mutually_exclusive_group()
129     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
130                         default=True, dest="log_timestamps")
131     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
132                         default=True, dest="log_timestamps")
133
134     parser.add_argument("--api",
135                         default=None, dest="work_api",
136                         choices=("jobs", "containers"),
137                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
138
139     parser.add_argument("--compute-checksum", action="store_true", default=False,
140                         help="Compute checksum of contents while collecting outputs",
141                         dest="compute_checksum")
142
143     parser.add_argument("--submit-runner-ram", type=int,
144                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
145                         default=None)
146
147     parser.add_argument("--submit-runner-image",
148                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
149                         default=None)
150
151     parser.add_argument("--always-submit-runner", action="store_true",
152                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
153                         default=False)
154
155     exgroup = parser.add_mutually_exclusive_group()
156     exgroup.add_argument("--submit-request-uuid",
157                          default=None,
158                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
159                          metavar="UUID")
160     exgroup.add_argument("--submit-runner-cluster",
161                          help="Submit workflow runner to a remote cluster (containers API only)",
162                          default=None,
163                          metavar="CLUSTER_ID")
164
165     parser.add_argument("--collection-cache-size", type=int,
166                         default=None,
167                         help="Collection cache size (in MiB, default 256).")
168
169     parser.add_argument("--name",
170                         help="Name to use for workflow execution instance.",
171                         default=None)
172
173     parser.add_argument("--on-error",
174                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
175                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
176                         default="continue", choices=("stop", "continue"))
177
178     parser.add_argument("--enable-dev", action="store_true",
179                         help="Enable loading and running development versions "
180                              "of CWL spec.", default=False)
181     parser.add_argument('--storage-classes', default="default",
182                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
183
184     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
185                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
186                         default=0)
187
188     parser.add_argument("--priority", type=int,
189                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
190                         default=DEFAULT_PRIORITY)
191
192     parser.add_argument("--disable-validate", dest="do_validate",
193                         action="store_false", default=True,
194                         help=argparse.SUPPRESS)
195
196     parser.add_argument("--disable-js-validation",
197                         action="store_true", default=False,
198                         help=argparse.SUPPRESS)
199
200     parser.add_argument("--thread-count", type=int,
201                         default=1, help="Number of threads to use for job submit and output collection.")
202
203     parser.add_argument("--http-timeout", type=int,
204                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
205
206     exgroup = parser.add_mutually_exclusive_group()
207     exgroup.add_argument("--trash-intermediate", action="store_true",
208                         default=False, dest="trash_intermediate",
209                          help="Immediately trash intermediate outputs on workflow success.")
210     exgroup.add_argument("--no-trash-intermediate", action="store_false",
211                         default=False, dest="trash_intermediate",
212                         help="Do not trash intermediate outputs (default).")
213
214     parser.add_argument("workflow", default=None, help="The workflow to execute")
215     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
216
217     return parser
218
219 def add_arv_hints():
220     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
221     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
222     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
223     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
224     customschema = res.read()
225     use_custom_schema("v1.0", "http://arvados.org/cwl", customschema)
226     use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema)
227     res.close()
228     cwltool.process.supportedProcessRequirements.extend([
229         "http://arvados.org/cwl#RunInSingleContainer",
230         "http://arvados.org/cwl#OutputDirType",
231         "http://arvados.org/cwl#RuntimeConstraints",
232         "http://arvados.org/cwl#PartitionRequirement",
233         "http://arvados.org/cwl#APIRequirement",
234         "http://commonwl.org/cwltool#LoadListingRequirement",
235         "http://arvados.org/cwl#IntermediateOutput",
236         "http://arvados.org/cwl#ReuseRequirement",
237         "http://arvados.org/cwl#ClusterTarget"
238     ])
239
240 def exit_signal_handler(sigcode, frame):
241     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
242     sys.exit(-sigcode)
243
244 def main(args, stdout, stderr, api_client=None, keep_client=None,
245          install_sig_handlers=True):
246     parser = arg_parser()
247
248     job_order_object = None
249     arvargs = parser.parse_args(args)
250
251     if len(arvargs.storage_classes.strip().split(',')) > 1:
252         logger.error(str(u"Multiple storage classes are not supported currently."))
253         return 1
254
255     arvargs.use_container = True
256     arvargs.relax_path_checks = True
257     arvargs.print_supported_versions = False
258
259     if install_sig_handlers:
260         arv_cmd.install_signal_handlers()
261
262     if arvargs.update_workflow:
263         if arvargs.update_workflow.find('-7fd4e-') == 5:
264             want_api = 'containers'
265         elif arvargs.update_workflow.find('-p5p6p-') == 5:
266             want_api = 'jobs'
267         else:
268             want_api = None
269         if want_api and arvargs.work_api and want_api != arvargs.work_api:
270             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
271                 arvargs.update_workflow, want_api, arvargs.work_api))
272             return 1
273         arvargs.work_api = want_api
274
275     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
276         job_order_object = ({}, "")
277
278     add_arv_hints()
279
280     for key, val in viewitems(cwltool.argparser.get_default_args()):
281         if not hasattr(arvargs, key):
282             setattr(arvargs, key, val)
283
284     try:
285         if api_client is None:
286             api_client = arvados.safeapi.ThreadSafeApiCache(
287                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
288                 keep_params={"num_retries": 4})
289             keep_client = api_client.keep
290             # Make an API object now so errors are reported early.
291             api_client.users().current().execute()
292         if keep_client is None:
293             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
294         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
295     except Exception:
296         logger.exception("Error creating the Arvados CWL Executor")
297         return 1
298
299     # Note that unless in debug mode, some stack traces related to user
300     # workflow errors may be suppressed. See ArvadosJob.done().
301     if arvargs.debug:
302         logger.setLevel(logging.DEBUG)
303         logging.getLogger('arvados').setLevel(logging.DEBUG)
304
305     if arvargs.quiet:
306         logger.setLevel(logging.WARN)
307         logging.getLogger('arvados').setLevel(logging.WARN)
308         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
309
310     if arvargs.metrics:
311         metrics.setLevel(logging.DEBUG)
312         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
313
314     if arvargs.log_timestamps:
315         arvados.log_handler.setFormatter(logging.Formatter(
316             '%(asctime)s %(name)s %(levelname)s: %(message)s',
317             '%Y-%m-%d %H:%M:%S'))
318     else:
319         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
320
321     if stdout is sys.stdout:
322         # cwltool.main has code to work around encoding issues with
323         # sys.stdout and unix pipes (they default to ASCII encoding,
324         # we want utf-8), so when stdout is sys.stdout set it to None
325         # to take advantage of that.  Don't override it for all cases
326         # since we still want to be able to capture stdout for the
327         # unit tests.
328         stdout = None
329
330     return cwltool.main.main(args=arvargs,
331                              stdout=stdout,
332                              stderr=stderr,
333                              executor=executor.arv_executor,
334                              versionfunc=versionstring,
335                              job_order_object=job_order_object,
336                              logger_handler=arvados.log_handler,
337                              custom_schema_callback=add_arv_hints,
338                              loadingContext=executor.loadingContext,
339                              runtimeContext=executor.runtimeContext)