14198: Call validate_cluster_target on --submit-runner-cluster
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import re
14 import pkg_resources  # part of setuptools
15
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
31
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
35
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 def versionstring():
55     """Print version string of key packages for provenance and debugging."""
56
57     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58     arvpkg = pkg_resources.require("arvados-python-client")
59     cwlpkg = pkg_resources.require("cwltool")
60
61     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62                                     "arvados-python-client", arvpkg[0].version,
63                                     "cwltool", cwlpkg[0].version)
64
65
66 def arg_parser():  # type: () -> argparse.ArgumentParser
67     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
68
69     parser.add_argument("--basedir", type=str,
70                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72                         help="Output directory, default current directory")
73
74     parser.add_argument("--eval-timeout",
75                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
76                         type=float,
77                         default=20)
78
79     exgroup = parser.add_mutually_exclusive_group()
80     exgroup.add_argument("--print-dot", action="store_true",
81                          help="Print workflow visualization in graphviz format and exit")
82     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
84
85     exgroup = parser.add_mutually_exclusive_group()
86     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
89
90     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
91
92     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
93
94     exgroup = parser.add_mutually_exclusive_group()
95     exgroup.add_argument("--enable-reuse", action="store_true",
96                         default=True, dest="enable_reuse",
97                         help="Enable job or container reuse (default)")
98     exgroup.add_argument("--disable-reuse", action="store_false",
99                         default=True, dest="enable_reuse",
100                         help="Disable job or container reuse")
101
102     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
107                         default=False)
108
109     exgroup = parser.add_mutually_exclusive_group()
110     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111                         default=True, dest="submit")
112     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113                         default=True, dest="submit")
114     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115                          dest="create_workflow")
116     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
118
119     exgroup = parser.add_mutually_exclusive_group()
120     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121                         default=True, dest="wait")
122     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123                         default=True, dest="wait")
124
125     exgroup = parser.add_mutually_exclusive_group()
126     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127                         default=True, dest="log_timestamps")
128     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129                         default=True, dest="log_timestamps")
130
131     parser.add_argument("--api", type=str,
132                         default=None, dest="work_api",
133                         choices=("jobs", "containers"),
134                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
135
136     parser.add_argument("--compute-checksum", action="store_true", default=False,
137                         help="Compute checksum of contents while collecting outputs",
138                         dest="compute_checksum")
139
140     parser.add_argument("--submit-runner-ram", type=int,
141                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
142                         default=None)
143
144     parser.add_argument("--submit-runner-image", type=str,
145                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
146                         default=None)
147
148     parser.add_argument("--always-submit-runner", action="store_true",
149                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
150                         default=False)
151
152     exgroup = parser.add_mutually_exclusive_group()
153     exgroup.add_argument("--submit-request-uuid", type=str,
154                          default=None,
155                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
156                          metavar="UUID")
157     exgroup.add_argument("--submit-runner-cluster", type=str,
158                          help="Submit workflow runner to a remote cluster (containers API only)",
159                          default=None,
160                          metavar="CLUSTER_ID")
161
162     parser.add_argument("--name", type=str,
163                         help="Name to use for workflow execution instance.",
164                         default=None)
165
166     parser.add_argument("--on-error", type=str,
167                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
168                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
169
170     parser.add_argument("--enable-dev", action="store_true",
171                         help="Enable loading and running development versions "
172                              "of CWL spec.", default=False)
173     parser.add_argument('--storage-classes', default="default", type=str,
174                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
175
176     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
177                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
178                         default=0)
179
180     parser.add_argument("--priority", type=int,
181                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
182                         default=DEFAULT_PRIORITY)
183
184     parser.add_argument("--disable-validate", dest="do_validate",
185                         action="store_false", default=True,
186                         help=argparse.SUPPRESS)
187
188     parser.add_argument("--disable-js-validation",
189                         action="store_true", default=False,
190                         help=argparse.SUPPRESS)
191
192     parser.add_argument("--thread-count", type=int,
193                         default=4, help="Number of threads to use for job submit and output collection.")
194
195     parser.add_argument("--http-timeout", type=int,
196                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
197
198     exgroup = parser.add_mutually_exclusive_group()
199     exgroup.add_argument("--trash-intermediate", action="store_true",
200                         default=False, dest="trash_intermediate",
201                          help="Immediately trash intermediate outputs on workflow success.")
202     exgroup.add_argument("--no-trash-intermediate", action="store_false",
203                         default=False, dest="trash_intermediate",
204                         help="Do not trash intermediate outputs (default).")
205
206     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
207     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
208
209     return parser
210
211 def add_arv_hints():
212     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
213     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
214     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
215     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
216     res.close()
217     cwltool.process.supportedProcessRequirements.extend([
218         "http://arvados.org/cwl#RunInSingleContainer",
219         "http://arvados.org/cwl#OutputDirType",
220         "http://arvados.org/cwl#RuntimeConstraints",
221         "http://arvados.org/cwl#PartitionRequirement",
222         "http://arvados.org/cwl#APIRequirement",
223         "http://commonwl.org/cwltool#LoadListingRequirement",
224         "http://arvados.org/cwl#IntermediateOutput",
225         "http://arvados.org/cwl#ReuseRequirement",
226         "http://arvados.org/cwl#ClusterTarget"
227     ])
228
229 def exit_signal_handler(sigcode, frame):
230     logger.error("Caught signal {}, exiting.".format(sigcode))
231     sys.exit(-sigcode)
232
233 def main(args, stdout, stderr, api_client=None, keep_client=None,
234          install_sig_handlers=True):
235     parser = arg_parser()
236
237     job_order_object = None
238     arvargs = parser.parse_args(args)
239
240     if len(arvargs.storage_classes.strip().split(',')) > 1:
241         logger.error("Multiple storage classes are not supported currently.")
242         return 1
243
244     arvargs.use_container = True
245     arvargs.relax_path_checks = True
246     arvargs.print_supported_versions = False
247
248     if install_sig_handlers:
249         arv_cmd.install_signal_handlers()
250
251     if arvargs.update_workflow:
252         if arvargs.update_workflow.find('-7fd4e-') == 5:
253             want_api = 'containers'
254         elif arvargs.update_workflow.find('-p5p6p-') == 5:
255             want_api = 'jobs'
256         else:
257             want_api = None
258         if want_api and arvargs.work_api and want_api != arvargs.work_api:
259             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
260                 arvargs.update_workflow, want_api, arvargs.work_api))
261             return 1
262         arvargs.work_api = want_api
263
264     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
265         job_order_object = ({}, "")
266
267     add_arv_hints()
268
269     for key, val in cwltool.argparser.get_default_args().items():
270         if not hasattr(arvargs, key):
271             setattr(arvargs, key, val)
272
273     try:
274         if api_client is None:
275             api_client = arvados.safeapi.ThreadSafeApiCache(
276                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
277                 keep_params={"num_retries": 4})
278             keep_client = api_client.keep
279             # Make an API object now so errors are reported early.
280             api_client.users().current().execute()
281         if keep_client is None:
282             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
283         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
284     except Exception as e:
285         logger.error(e)
286         return 1
287
288     if arvargs.debug:
289         logger.setLevel(logging.DEBUG)
290         logging.getLogger('arvados').setLevel(logging.DEBUG)
291
292     if arvargs.quiet:
293         logger.setLevel(logging.WARN)
294         logging.getLogger('arvados').setLevel(logging.WARN)
295         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
296
297     if arvargs.metrics:
298         metrics.setLevel(logging.DEBUG)
299         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
300
301     if arvargs.log_timestamps:
302         arvados.log_handler.setFormatter(logging.Formatter(
303             '%(asctime)s %(name)s %(levelname)s: %(message)s',
304             '%Y-%m-%d %H:%M:%S'))
305     else:
306         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
307
308     return cwltool.main.main(args=arvargs,
309                              stdout=stdout,
310                              stderr=stderr,
311                              executor=executor.arv_executor,
312                              versionfunc=versionstring,
313                              job_order_object=job_order_object,
314                              logger_handler=arvados.log_handler,
315                              custom_schema_callback=add_arv_hints,
316                              loadingContext=executor.loadingContext,
317                              runtimeContext=executor.runtimeContext)