14198: Add --always-submit-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import re
14 import pkg_resources  # part of setuptools
15
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
31
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
35
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 def versionstring():
55     """Print version string of key packages for provenance and debugging."""
56
57     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58     arvpkg = pkg_resources.require("arvados-python-client")
59     cwlpkg = pkg_resources.require("cwltool")
60
61     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62                                     "arvados-python-client", arvpkg[0].version,
63                                     "cwltool", cwlpkg[0].version)
64
65
66 def arg_parser():  # type: () -> argparse.ArgumentParser
67     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
68
69     parser.add_argument("--basedir", type=str,
70                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72                         help="Output directory, default current directory")
73
74     parser.add_argument("--eval-timeout",
75                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
76                         type=float,
77                         default=20)
78
79     exgroup = parser.add_mutually_exclusive_group()
80     exgroup.add_argument("--print-dot", action="store_true",
81                          help="Print workflow visualization in graphviz format and exit")
82     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
84
85     exgroup = parser.add_mutually_exclusive_group()
86     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
89
90     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
91
92     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
93
94     exgroup = parser.add_mutually_exclusive_group()
95     exgroup.add_argument("--enable-reuse", action="store_true",
96                         default=True, dest="enable_reuse",
97                         help="Enable job or container reuse (default)")
98     exgroup.add_argument("--disable-reuse", action="store_false",
99                         default=True, dest="enable_reuse",
100                         help="Disable job or container reuse")
101
102     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
107                         default=False)
108
109     exgroup = parser.add_mutually_exclusive_group()
110     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111                         default=True, dest="submit")
112     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113                         default=True, dest="submit")
114     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115                          dest="create_workflow")
116     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
118
119     exgroup = parser.add_mutually_exclusive_group()
120     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121                         default=True, dest="wait")
122     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123                         default=True, dest="wait")
124
125     exgroup = parser.add_mutually_exclusive_group()
126     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127                         default=True, dest="log_timestamps")
128     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129                         default=True, dest="log_timestamps")
130
131     parser.add_argument("--api", type=str,
132                         default=None, dest="work_api",
133                         choices=("jobs", "containers"),
134                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
135
136     parser.add_argument("--compute-checksum", action="store_true", default=False,
137                         help="Compute checksum of contents while collecting outputs",
138                         dest="compute_checksum")
139
140     parser.add_argument("--submit-runner-ram", type=int,
141                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
142                         default=None)
143
144     parser.add_argument("--submit-runner-image", type=str,
145                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
146                         default=None)
147
148     parser.add_argument("--always-submit-runner", action="store_true",
149                         help="Always submit a runner to manage the workflow, even when running only a single CommandLineTool",
150                         default=False)
151
152     exgroup = parser.add_mutually_exclusive_group()
153     exgroup.add_argument("--submit-request-uuid", type=str,
154                         default=None,
155                         help="Update and commit to supplied container request instead of creating a new one (containers API only).")
156     exgroup.add_argument("--submit-runner-cluster", type=str,
157                         help="Submit toplevel runner to a remote cluster (containers API only)",
158                         default=None)
159
160     parser.add_argument("--name", type=str,
161                         help="Name to use for workflow execution instance.",
162                         default=None)
163
164     parser.add_argument("--on-error", type=str,
165                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
166                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
167
168     parser.add_argument("--enable-dev", action="store_true",
169                         help="Enable loading and running development versions "
170                              "of CWL spec.", default=False)
171     parser.add_argument('--storage-classes', default="default", type=str,
172                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
173
174     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
175                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
176                         default=0)
177
178     parser.add_argument("--priority", type=int,
179                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
180                         default=DEFAULT_PRIORITY)
181
182     parser.add_argument("--disable-validate", dest="do_validate",
183                         action="store_false", default=True,
184                         help=argparse.SUPPRESS)
185
186     parser.add_argument("--disable-js-validation",
187                         action="store_true", default=False,
188                         help=argparse.SUPPRESS)
189
190     parser.add_argument("--thread-count", type=int,
191                         default=4, help="Number of threads to use for job submit and output collection.")
192
193     parser.add_argument("--http-timeout", type=int,
194                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
195
196     exgroup = parser.add_mutually_exclusive_group()
197     exgroup.add_argument("--trash-intermediate", action="store_true",
198                         default=False, dest="trash_intermediate",
199                          help="Immediately trash intermediate outputs on workflow success.")
200     exgroup.add_argument("--no-trash-intermediate", action="store_false",
201                         default=False, dest="trash_intermediate",
202                         help="Do not trash intermediate outputs (default).")
203
204     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
205     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
206
207     return parser
208
209 def add_arv_hints():
210     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
211     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
212     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
213     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
214     res.close()
215     cwltool.process.supportedProcessRequirements.extend([
216         "http://arvados.org/cwl#RunInSingleContainer",
217         "http://arvados.org/cwl#OutputDirType",
218         "http://arvados.org/cwl#RuntimeConstraints",
219         "http://arvados.org/cwl#PartitionRequirement",
220         "http://arvados.org/cwl#APIRequirement",
221         "http://commonwl.org/cwltool#LoadListingRequirement",
222         "http://arvados.org/cwl#IntermediateOutput",
223         "http://arvados.org/cwl#ReuseRequirement",
224         "http://arvados.org/cwl#ClusterTarget"
225     ])
226
227 def exit_signal_handler(sigcode, frame):
228     logger.error("Caught signal {}, exiting.".format(sigcode))
229     sys.exit(-sigcode)
230
231 def main(args, stdout, stderr, api_client=None, keep_client=None,
232          install_sig_handlers=True):
233     parser = arg_parser()
234
235     job_order_object = None
236     arvargs = parser.parse_args(args)
237
238     if len(arvargs.storage_classes.strip().split(',')) > 1:
239         logger.error("Multiple storage classes are not supported currently.")
240         return 1
241
242     arvargs.use_container = True
243     arvargs.relax_path_checks = True
244     arvargs.print_supported_versions = False
245
246     if install_sig_handlers:
247         arv_cmd.install_signal_handlers()
248
249     if arvargs.update_workflow:
250         if arvargs.update_workflow.find('-7fd4e-') == 5:
251             want_api = 'containers'
252         elif arvargs.update_workflow.find('-p5p6p-') == 5:
253             want_api = 'jobs'
254         else:
255             want_api = None
256         if want_api and arvargs.work_api and want_api != arvargs.work_api:
257             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
258                 arvargs.update_workflow, want_api, arvargs.work_api))
259             return 1
260         arvargs.work_api = want_api
261
262     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
263         job_order_object = ({}, "")
264
265     add_arv_hints()
266
267     for key, val in cwltool.argparser.get_default_args().items():
268         if not hasattr(arvargs, key):
269             setattr(arvargs, key, val)
270
271     try:
272         if api_client is None:
273             api_client = arvados.safeapi.ThreadSafeApiCache(
274                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
275                 keep_params={"num_retries": 4})
276             keep_client = api_client.keep
277             # Make an API object now so errors are reported early.
278             api_client.users().current().execute()
279         if keep_client is None:
280             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
281         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
282     except Exception as e:
283         logger.error(e)
284         return 1
285
286     if arvargs.debug:
287         logger.setLevel(logging.DEBUG)
288         logging.getLogger('arvados').setLevel(logging.DEBUG)
289
290     if arvargs.quiet:
291         logger.setLevel(logging.WARN)
292         logging.getLogger('arvados').setLevel(logging.WARN)
293         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
294
295     if arvargs.metrics:
296         metrics.setLevel(logging.DEBUG)
297         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
298
299     if arvargs.log_timestamps:
300         arvados.log_handler.setFormatter(logging.Formatter(
301             '%(asctime)s %(name)s %(levelname)s: %(message)s',
302             '%Y-%m-%d %H:%M:%S'))
303     else:
304         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
305
306     return cwltool.main.main(args=arvargs,
307                              stdout=stdout,
308                              stderr=stderr,
309                              executor=executor.arv_executor,
310                              versionfunc=versionstring,
311                              job_order_object=job_order_object,
312                              logger_handler=arvados.log_handler,
313                              custom_schema_callback=add_arv_hints,
314                              loadingContext=executor.loadingContext,
315                              runtimeContext=executor.runtimeContext)