14198: Unit tests pass again
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import re
14 import pkg_resources  # part of setuptools
15
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
31
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
35
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 def versionstring():
55     """Print version string of key packages for provenance and debugging."""
56
57     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58     arvpkg = pkg_resources.require("arvados-python-client")
59     cwlpkg = pkg_resources.require("cwltool")
60
61     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62                                     "arvados-python-client", arvpkg[0].version,
63                                     "cwltool", cwlpkg[0].version)
64
65
66 def arg_parser():  # type: () -> argparse.ArgumentParser
67     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
68
69     parser.add_argument("--basedir", type=str,
70                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72                         help="Output directory, default current directory")
73
74     parser.add_argument("--eval-timeout",
75                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
76                         type=float,
77                         default=20)
78
79     exgroup = parser.add_mutually_exclusive_group()
80     exgroup.add_argument("--print-dot", action="store_true",
81                          help="Print workflow visualization in graphviz format and exit")
82     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
84
85     exgroup = parser.add_mutually_exclusive_group()
86     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
89
90     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
91
92     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
93
94     exgroup = parser.add_mutually_exclusive_group()
95     exgroup.add_argument("--enable-reuse", action="store_true",
96                         default=True, dest="enable_reuse",
97                         help="Enable job or container reuse (default)")
98     exgroup.add_argument("--disable-reuse", action="store_false",
99                         default=True, dest="enable_reuse",
100                         help="Disable job or container reuse")
101
102     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
107                         default=False)
108
109     exgroup = parser.add_mutually_exclusive_group()
110     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111                         default=True, dest="submit")
112     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113                         default=True, dest="submit")
114     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115                          dest="create_workflow")
116     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
118
119     exgroup = parser.add_mutually_exclusive_group()
120     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121                         default=True, dest="wait")
122     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123                         default=True, dest="wait")
124
125     exgroup = parser.add_mutually_exclusive_group()
126     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127                         default=True, dest="log_timestamps")
128     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129                         default=True, dest="log_timestamps")
130
131     parser.add_argument("--api", type=str,
132                         default=None, dest="work_api",
133                         choices=("jobs", "containers"),
134                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
135
136     parser.add_argument("--compute-checksum", action="store_true", default=False,
137                         help="Compute checksum of contents while collecting outputs",
138                         dest="compute_checksum")
139
140     parser.add_argument("--submit-runner-ram", type=int,
141                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
142                         default=None)
143
144     parser.add_argument("--submit-runner-image", type=str,
145                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
146                         default=None)
147
148     exgroup = parser.add_mutually_exclusive_group()
149     exgroup.add_argument("--submit-request-uuid", type=str,
150                         default=None,
151                         help="Update and commit to supplied container request instead of creating a new one (containers API only).")
152     exgroup.add_argument("--submit-runner-cluster", type=str,
153                         help="Submit toplevel runner to a remote cluster (containers API only)",
154                         default=None)
155
156     parser.add_argument("--name", type=str,
157                         help="Name to use for workflow execution instance.",
158                         default=None)
159
160     parser.add_argument("--on-error", type=str,
161                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
162                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
163
164     parser.add_argument("--enable-dev", action="store_true",
165                         help="Enable loading and running development versions "
166                              "of CWL spec.", default=False)
167     parser.add_argument('--storage-classes', default="default", type=str,
168                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
169
170     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
171                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
172                         default=0)
173
174     parser.add_argument("--priority", type=int,
175                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
176                         default=DEFAULT_PRIORITY)
177
178     parser.add_argument("--disable-validate", dest="do_validate",
179                         action="store_false", default=True,
180                         help=argparse.SUPPRESS)
181
182     parser.add_argument("--disable-js-validation",
183                         action="store_true", default=False,
184                         help=argparse.SUPPRESS)
185
186     parser.add_argument("--thread-count", type=int,
187                         default=4, help="Number of threads to use for job submit and output collection.")
188
189     parser.add_argument("--http-timeout", type=int,
190                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
191
192     exgroup = parser.add_mutually_exclusive_group()
193     exgroup.add_argument("--trash-intermediate", action="store_true",
194                         default=False, dest="trash_intermediate",
195                          help="Immediately trash intermediate outputs on workflow success.")
196     exgroup.add_argument("--no-trash-intermediate", action="store_false",
197                         default=False, dest="trash_intermediate",
198                         help="Do not trash intermediate outputs (default).")
199
200     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
201     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
202
203     return parser
204
205 def add_arv_hints():
206     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
207     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
208     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
209     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
210     res.close()
211     cwltool.process.supportedProcessRequirements.extend([
212         "http://arvados.org/cwl#RunInSingleContainer",
213         "http://arvados.org/cwl#OutputDirType",
214         "http://arvados.org/cwl#RuntimeConstraints",
215         "http://arvados.org/cwl#PartitionRequirement",
216         "http://arvados.org/cwl#APIRequirement",
217         "http://commonwl.org/cwltool#LoadListingRequirement",
218         "http://arvados.org/cwl#IntermediateOutput",
219         "http://arvados.org/cwl#ReuseRequirement",
220         "http://arvados.org/cwl#ClusterTarget"
221     ])
222
223 def exit_signal_handler(sigcode, frame):
224     logger.error("Caught signal {}, exiting.".format(sigcode))
225     sys.exit(-sigcode)
226
227 def main(args, stdout, stderr, api_client=None, keep_client=None,
228          install_sig_handlers=True):
229     parser = arg_parser()
230
231     job_order_object = None
232     arvargs = parser.parse_args(args)
233
234     if len(arvargs.storage_classes.strip().split(',')) > 1:
235         logger.error("Multiple storage classes are not supported currently.")
236         return 1
237
238     arvargs.use_container = True
239     arvargs.relax_path_checks = True
240     arvargs.print_supported_versions = False
241
242     if install_sig_handlers:
243         arv_cmd.install_signal_handlers()
244
245     if arvargs.update_workflow:
246         if arvargs.update_workflow.find('-7fd4e-') == 5:
247             want_api = 'containers'
248         elif arvargs.update_workflow.find('-p5p6p-') == 5:
249             want_api = 'jobs'
250         else:
251             want_api = None
252         if want_api and arvargs.work_api and want_api != arvargs.work_api:
253             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
254                 arvargs.update_workflow, want_api, arvargs.work_api))
255             return 1
256         arvargs.work_api = want_api
257
258     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
259         job_order_object = ({}, "")
260
261     add_arv_hints()
262
263     for key, val in cwltool.argparser.get_default_args().items():
264         if not hasattr(arvargs, key):
265             setattr(arvargs, key, val)
266
267     try:
268         if api_client is None:
269             api_client = arvados.safeapi.ThreadSafeApiCache(
270                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
271                 keep_params={"num_retries": 4})
272             keep_client = api_client.keep
273             # Make an API object now so errors are reported early.
274             api_client.users().current().execute()
275         if keep_client is None:
276             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
277         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
278     except Exception as e:
279         logger.error(e)
280         return 1
281
282     if arvargs.debug:
283         logger.setLevel(logging.DEBUG)
284         logging.getLogger('arvados').setLevel(logging.DEBUG)
285
286     if arvargs.quiet:
287         logger.setLevel(logging.WARN)
288         logging.getLogger('arvados').setLevel(logging.WARN)
289         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
290
291     if arvargs.metrics:
292         metrics.setLevel(logging.DEBUG)
293         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
294
295     if arvargs.log_timestamps:
296         arvados.log_handler.setFormatter(logging.Formatter(
297             '%(asctime)s %(name)s %(levelname)s: %(message)s',
298             '%Y-%m-%d %H:%M:%S'))
299     else:
300         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
301
302     return cwltool.main.main(args=arvargs,
303                              stdout=stdout,
304                              stderr=stderr,
305                              executor=executor.arv_executor,
306                              versionfunc=versionstring,
307                              job_order_object=job_order_object,
308                              logger_handler=arvados.log_handler,
309                              custom_schema_callback=add_arv_hints,
310                              loadingContext=executor.loadingContext,
311                              runtimeContext=executor.runtimeContext)