13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import re
14 import pkg_resources  # part of setuptools
15
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
31
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
35
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 def versionstring():
55     """Print version string of key packages for provenance and debugging."""
56
57     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58     arvpkg = pkg_resources.require("arvados-python-client")
59     cwlpkg = pkg_resources.require("cwltool")
60
61     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62                                     "arvados-python-client", arvpkg[0].version,
63                                     "cwltool", cwlpkg[0].version)
64
65
66 def arg_parser():  # type: () -> argparse.ArgumentParser
67     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
68
69     parser.add_argument("--basedir", type=str,
70                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72                         help="Output directory, default current directory")
73
74     parser.add_argument("--eval-timeout",
75                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
76                         type=float,
77                         default=20)
78
79     exgroup = parser.add_mutually_exclusive_group()
80     exgroup.add_argument("--print-dot", action="store_true",
81                          help="Print workflow visualization in graphviz format and exit")
82     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
84
85     exgroup = parser.add_mutually_exclusive_group()
86     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
89
90     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
91
92     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
93
94     exgroup = parser.add_mutually_exclusive_group()
95     exgroup.add_argument("--enable-reuse", action="store_true",
96                         default=True, dest="enable_reuse",
97                         help="Enable job or container reuse (default)")
98     exgroup.add_argument("--disable-reuse", action="store_false",
99                         default=True, dest="enable_reuse",
100                         help="Disable job or container reuse")
101
102     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
107                         default=False)
108
109     exgroup = parser.add_mutually_exclusive_group()
110     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111                         default=True, dest="submit")
112     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113                         default=True, dest="submit")
114     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115                          dest="create_workflow")
116     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
118
119     exgroup = parser.add_mutually_exclusive_group()
120     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121                         default=True, dest="wait")
122     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123                         default=True, dest="wait")
124
125     exgroup = parser.add_mutually_exclusive_group()
126     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127                         default=True, dest="log_timestamps")
128     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129                         default=True, dest="log_timestamps")
130
131     parser.add_argument("--api", type=str,
132                         default=None, dest="work_api",
133                         choices=("jobs", "containers"),
134                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
135
136     parser.add_argument("--compute-checksum", action="store_true", default=False,
137                         help="Compute checksum of contents while collecting outputs",
138                         dest="compute_checksum")
139
140     parser.add_argument("--submit-runner-ram", type=int,
141                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
142                         default=None)
143
144     parser.add_argument("--submit-runner-image", type=str,
145                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
146                         default=None)
147
148     parser.add_argument("--always-submit-runner", action="store_true",
149                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
150                         default=False)
151
152     exgroup = parser.add_mutually_exclusive_group()
153     exgroup.add_argument("--submit-request-uuid", type=str,
154                          default=None,
155                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
156                          metavar="UUID")
157     exgroup.add_argument("--submit-runner-cluster", type=str,
158                          help="Submit workflow runner to a remote cluster (containers API only)",
159                          default=None,
160                          metavar="CLUSTER_ID")
161
162     parser.add_argument("--collection-cache-size", type=int,
163                         default=None,
164                         help="Collection cache size (in MiB, default 256).")
165
166     parser.add_argument("--name", type=str,
167                         help="Name to use for workflow execution instance.",
168                         default=None)
169
170     parser.add_argument("--on-error",
171                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
172                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
173                         default="continue", choices=("stop", "continue"))
174
175     parser.add_argument("--enable-dev", action="store_true",
176                         help="Enable loading and running development versions "
177                              "of CWL spec.", default=False)
178     parser.add_argument('--storage-classes', default="default", type=str,
179                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
180
181     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
182                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
183                         default=0)
184
185     parser.add_argument("--priority", type=int,
186                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
187                         default=DEFAULT_PRIORITY)
188
189     parser.add_argument("--disable-validate", dest="do_validate",
190                         action="store_false", default=True,
191                         help=argparse.SUPPRESS)
192
193     parser.add_argument("--disable-js-validation",
194                         action="store_true", default=False,
195                         help=argparse.SUPPRESS)
196
197     parser.add_argument("--thread-count", type=int,
198                         default=1, help="Number of threads to use for job submit and output collection.")
199
200     parser.add_argument("--http-timeout", type=int,
201                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
202
203     exgroup = parser.add_mutually_exclusive_group()
204     exgroup.add_argument("--trash-intermediate", action="store_true",
205                         default=False, dest="trash_intermediate",
206                          help="Immediately trash intermediate outputs on workflow success.")
207     exgroup.add_argument("--no-trash-intermediate", action="store_false",
208                         default=False, dest="trash_intermediate",
209                         help="Do not trash intermediate outputs (default).")
210
211     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
212     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
213
214     return parser
215
216 def add_arv_hints():
217     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
218     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
219     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
220     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
221     res.close()
222     cwltool.process.supportedProcessRequirements.extend([
223         "http://arvados.org/cwl#RunInSingleContainer",
224         "http://arvados.org/cwl#OutputDirType",
225         "http://arvados.org/cwl#RuntimeConstraints",
226         "http://arvados.org/cwl#PartitionRequirement",
227         "http://arvados.org/cwl#APIRequirement",
228         "http://commonwl.org/cwltool#LoadListingRequirement",
229         "http://arvados.org/cwl#IntermediateOutput",
230         "http://arvados.org/cwl#ReuseRequirement",
231         "http://arvados.org/cwl#ClusterTarget"
232     ])
233
234 def exit_signal_handler(sigcode, frame):
235     logger.error("Caught signal {}, exiting.".format(sigcode))
236     sys.exit(-sigcode)
237
238 def main(args, stdout, stderr, api_client=None, keep_client=None,
239          install_sig_handlers=True):
240     parser = arg_parser()
241
242     job_order_object = None
243     arvargs = parser.parse_args(args)
244
245     if len(arvargs.storage_classes.strip().split(',')) > 1:
246         logger.error("Multiple storage classes are not supported currently.")
247         return 1
248
249     arvargs.use_container = True
250     arvargs.relax_path_checks = True
251     arvargs.print_supported_versions = False
252
253     if install_sig_handlers:
254         arv_cmd.install_signal_handlers()
255
256     if arvargs.update_workflow:
257         if arvargs.update_workflow.find('-7fd4e-') == 5:
258             want_api = 'containers'
259         elif arvargs.update_workflow.find('-p5p6p-') == 5:
260             want_api = 'jobs'
261         else:
262             want_api = None
263         if want_api and arvargs.work_api and want_api != arvargs.work_api:
264             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
265                 arvargs.update_workflow, want_api, arvargs.work_api))
266             return 1
267         arvargs.work_api = want_api
268
269     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
270         job_order_object = ({}, "")
271
272     add_arv_hints()
273
274     for key, val in list(cwltool.argparser.get_default_args().items()):
275         if not hasattr(arvargs, key):
276             setattr(arvargs, key, val)
277
278     try:
279         if api_client is None:
280             api_client = arvados.safeapi.ThreadSafeApiCache(
281                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
282                 keep_params={"num_retries": 4})
283             keep_client = api_client.keep
284             # Make an API object now so errors are reported early.
285             api_client.users().current().execute()
286         if keep_client is None:
287             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
288         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
289     except Exception as e:
290         logger.error(e)
291         return 1
292
293     if arvargs.debug:
294         logger.setLevel(logging.DEBUG)
295         logging.getLogger('arvados').setLevel(logging.DEBUG)
296
297     if arvargs.quiet:
298         logger.setLevel(logging.WARN)
299         logging.getLogger('arvados').setLevel(logging.WARN)
300         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
301
302     if arvargs.metrics:
303         metrics.setLevel(logging.DEBUG)
304         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
305
306     if arvargs.log_timestamps:
307         arvados.log_handler.setFormatter(logging.Formatter(
308             '%(asctime)s %(name)s %(levelname)s: %(message)s',
309             '%Y-%m-%d %H:%M:%S'))
310     else:
311         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
312
313     return cwltool.main.main(args=arvargs,
314                              stdout=stdout,
315                              stderr=stderr,
316                              executor=executor.arv_executor,
317                              versionfunc=versionstring,
318                              job_order_object=job_order_object,
319                              logger_handler=arvados.log_handler,
320                              custom_schema_callback=add_arv_hints,
321                              loadingContext=executor.loadingContext,
322                              runtimeContext=executor.runtimeContext)