Merge branch 'master' into 13937-keepstore-prometheus
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 from future.utils import viewitems
10 from builtins import str
11
12 import argparse
13 import logging
14 import os
15 import sys
16 import re
17 import pkg_resources  # part of setuptools
18
19 from schema_salad.sourceline import SourceLine
20 import schema_salad.validate as validate
21 import cwltool.main
22 import cwltool.workflow
23 import cwltool.process
24 import cwltool.argparser
25 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
26 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32 import arvados.commands._util as arv_cmd
33 from arvados.api import OrderedJsonModel
34
35 from .perf import Perf
36 from ._version import __version__
37 from .executor import ArvCwlExecutor
38
39 # These arn't used directly in this file but
40 # other code expects to import them from here
41 from .arvcontainer import ArvadosContainer
42 from .arvjob import ArvadosJob
43 from .arvtool import ArvadosCommandTool
44 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
45 from .util import get_current_container
46 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
47 from .arvworkflow import ArvadosWorkflow
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 def versionstring():
58     """Print version string of key packages for provenance and debugging."""
59
60     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
61     arvpkg = pkg_resources.require("arvados-python-client")
62     cwlpkg = pkg_resources.require("cwltool")
63
64     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
65                                     "arvados-python-client", arvpkg[0].version,
66                                     "cwltool", cwlpkg[0].version)
67
68
69 def arg_parser():  # type: () -> argparse.ArgumentParser
70     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
71
72     parser.add_argument("--basedir",
73                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
74     parser.add_argument("--outdir", default=os.path.abspath('.'),
75                         help="Output directory, default current directory")
76
77     parser.add_argument("--eval-timeout",
78                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79                         type=float,
80                         default=20)
81
82     exgroup = parser.add_mutually_exclusive_group()
83     exgroup.add_argument("--print-dot", action="store_true",
84                          help="Print workflow visualization in graphviz format and exit")
85     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
86     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
87
88     exgroup = parser.add_mutually_exclusive_group()
89     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
90     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
91     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
92
93     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
94
95     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
96
97     exgroup = parser.add_mutually_exclusive_group()
98     exgroup.add_argument("--enable-reuse", action="store_true",
99                         default=True, dest="enable_reuse",
100                         help="Enable job or container reuse (default)")
101     exgroup.add_argument("--disable-reuse", action="store_false",
102                         default=True, dest="enable_reuse",
103                         help="Disable job or container reuse")
104
105     parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
106     parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
107     parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
108     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
109                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
110                         default=False)
111
112     exgroup = parser.add_mutually_exclusive_group()
113     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
114                         default=True, dest="submit")
115     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
116                         default=True, dest="submit")
117     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
118                          dest="create_workflow")
119     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
120     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
121
122     exgroup = parser.add_mutually_exclusive_group()
123     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
124                         default=True, dest="wait")
125     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
126                         default=True, dest="wait")
127
128     exgroup = parser.add_mutually_exclusive_group()
129     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
130                         default=True, dest="log_timestamps")
131     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
132                         default=True, dest="log_timestamps")
133
134     parser.add_argument("--api",
135                         default=None, dest="work_api",
136                         choices=("jobs", "containers"),
137                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
138
139     parser.add_argument("--compute-checksum", action="store_true", default=False,
140                         help="Compute checksum of contents while collecting outputs",
141                         dest="compute_checksum")
142
143     parser.add_argument("--submit-runner-ram", type=int,
144                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
145                         default=None)
146
147     parser.add_argument("--submit-runner-image",
148                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
149                         default=None)
150
151     parser.add_argument("--always-submit-runner", action="store_true",
152                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
153                         default=False)
154
155     exgroup = parser.add_mutually_exclusive_group()
156     exgroup.add_argument("--submit-request-uuid",
157                          default=None,
158                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
159                          metavar="UUID")
160     exgroup.add_argument("--submit-runner-cluster",
161                          help="Submit workflow runner to a remote cluster (containers API only)",
162                          default=None,
163                          metavar="CLUSTER_ID")
164
165     parser.add_argument("--collection-cache-size", type=int,
166                         default=None,
167                         help="Collection cache size (in MiB, default 256).")
168
169     parser.add_argument("--name",
170                         help="Name to use for workflow execution instance.",
171                         default=None)
172
173     parser.add_argument("--on-error",
174                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
175                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
176                         default="continue", choices=("stop", "continue"))
177
178     parser.add_argument("--enable-dev", action="store_true",
179                         help="Enable loading and running development versions "
180                              "of CWL spec.", default=False)
181     parser.add_argument('--storage-classes', default="default",
182                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
183
184     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
185                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
186                         default=0)
187
188     parser.add_argument("--priority", type=int,
189                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
190                         default=DEFAULT_PRIORITY)
191
192     parser.add_argument("--disable-validate", dest="do_validate",
193                         action="store_false", default=True,
194                         help=argparse.SUPPRESS)
195
196     parser.add_argument("--disable-js-validation",
197                         action="store_true", default=False,
198                         help=argparse.SUPPRESS)
199
200     parser.add_argument("--thread-count", type=int,
201                         default=1, help="Number of threads to use for job submit and output collection.")
202
203     parser.add_argument("--http-timeout", type=int,
204                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
205
206     exgroup = parser.add_mutually_exclusive_group()
207     exgroup.add_argument("--trash-intermediate", action="store_true",
208                         default=False, dest="trash_intermediate",
209                          help="Immediately trash intermediate outputs on workflow success.")
210     exgroup.add_argument("--no-trash-intermediate", action="store_false",
211                         default=False, dest="trash_intermediate",
212                         help="Do not trash intermediate outputs (default).")
213
214     parser.add_argument("workflow", default=None, help="The workflow to execute")
215     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
216
217     return parser
218
219 def add_arv_hints():
220     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
221     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
222     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
223     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
224     res.close()
225     cwltool.process.supportedProcessRequirements.extend([
226         "http://arvados.org/cwl#RunInSingleContainer",
227         "http://arvados.org/cwl#OutputDirType",
228         "http://arvados.org/cwl#RuntimeConstraints",
229         "http://arvados.org/cwl#PartitionRequirement",
230         "http://arvados.org/cwl#APIRequirement",
231         "http://commonwl.org/cwltool#LoadListingRequirement",
232         "http://arvados.org/cwl#IntermediateOutput",
233         "http://arvados.org/cwl#ReuseRequirement",
234         "http://arvados.org/cwl#ClusterTarget"
235     ])
236
237 def exit_signal_handler(sigcode, frame):
238     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
239     sys.exit(-sigcode)
240
241 def main(args, stdout, stderr, api_client=None, keep_client=None,
242          install_sig_handlers=True):
243     parser = arg_parser()
244
245     job_order_object = None
246     arvargs = parser.parse_args(args)
247
248     if len(arvargs.storage_classes.strip().split(',')) > 1:
249         logger.error(str(u"Multiple storage classes are not supported currently."))
250         return 1
251
252     arvargs.use_container = True
253     arvargs.relax_path_checks = True
254     arvargs.print_supported_versions = False
255
256     if install_sig_handlers:
257         arv_cmd.install_signal_handlers()
258
259     if arvargs.update_workflow:
260         if arvargs.update_workflow.find('-7fd4e-') == 5:
261             want_api = 'containers'
262         elif arvargs.update_workflow.find('-p5p6p-') == 5:
263             want_api = 'jobs'
264         else:
265             want_api = None
266         if want_api and arvargs.work_api and want_api != arvargs.work_api:
267             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
268                 arvargs.update_workflow, want_api, arvargs.work_api))
269             return 1
270         arvargs.work_api = want_api
271
272     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
273         job_order_object = ({}, "")
274
275     add_arv_hints()
276
277     for key, val in viewitems(cwltool.argparser.get_default_args()):
278         if not hasattr(arvargs, key):
279             setattr(arvargs, key, val)
280
281     try:
282         if api_client is None:
283             api_client = arvados.safeapi.ThreadSafeApiCache(
284                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
285                 keep_params={"num_retries": 4})
286             keep_client = api_client.keep
287             # Make an API object now so errors are reported early.
288             api_client.users().current().execute()
289         if keep_client is None:
290             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
291         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
292     except Exception as e:
293         logger.error(e)
294         return 1
295
296     if arvargs.debug:
297         logger.setLevel(logging.DEBUG)
298         logging.getLogger('arvados').setLevel(logging.DEBUG)
299
300     if arvargs.quiet:
301         logger.setLevel(logging.WARN)
302         logging.getLogger('arvados').setLevel(logging.WARN)
303         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
304
305     if arvargs.metrics:
306         metrics.setLevel(logging.DEBUG)
307         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
308
309     if arvargs.log_timestamps:
310         arvados.log_handler.setFormatter(logging.Formatter(
311             '%(asctime)s %(name)s %(levelname)s: %(message)s',
312             '%Y-%m-%d %H:%M:%S'))
313     else:
314         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
315
316     return cwltool.main.main(args=arvargs,
317                              stdout=stdout,
318                              stderr=stderr,
319                              executor=executor.arv_executor,
320                              versionfunc=versionstring,
321                              job_order_object=job_order_object,
322                              logger_handler=arvados.log_handler,
323                              custom_schema_callback=add_arv_hints,
324                              loadingContext=executor.loadingContext,
325                              runtimeContext=executor.runtimeContext)