21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # the Crunch containers API.
8
9 from future.utils import viewitems
10 from builtins import str
11
12 import argparse
13 import importlib.metadata
14 import importlib.resources
15 import logging
16 import os
17 import sys
18 import re
19
20 from schema_salad.sourceline import SourceLine
21 import schema_salad.validate as validate
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 import cwltool.argparser
26 from cwltool.errors import WorkflowException
27 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
28 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
29
30 import arvados
31 import arvados.config
32 import arvados.logging
33 from arvados.keep import KeepClient
34 from arvados.errors import ApiError
35 import arvados.commands._util as arv_cmd
36
37 from .perf import Perf
38 from ._version import __version__
39 from .executor import ArvCwlExecutor
40 from .fsaccess import workflow_uuid_pattern
41
42 # These aren't used directly in this file but
43 # other code expects to import them from here
44 from .arvcontainer import ArvadosContainer
45 from .arvtool import ArvadosCommandTool
46 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
47 from .util import get_current_container
48 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
49 from .arvworkflow import ArvadosWorkflow
50
51 logger = logging.getLogger('arvados.cwl-runner')
52 metrics = logging.getLogger('arvados.cwl-runner.metrics')
53 logger.setLevel(logging.INFO)
54
55 arvados.log_handler.setFormatter(logging.Formatter(
56         '%(asctime)s %(name)s %(levelname)s: %(message)s',
57         '%Y-%m-%d %H:%M:%S'))
58
59 def versionstring():
60     """Print version string of key packages for provenance and debugging."""
61     return "{} {}, arvados-python-client {}, cwltool {}".format(
62         sys.argv[0],
63         importlib.metadata.version('arvados-cwl-runner'),
64         importlib.metadata.version('arvados-python-client'),
65         importlib.metadata.version('cwltool'),
66     )
67
68 def arg_parser():  # type: () -> argparse.ArgumentParser
69     parser = argparse.ArgumentParser(
70         description='Arvados executor for Common Workflow Language',
71         parents=[arv_cmd.retry_opt],
72     )
73
74     parser.add_argument("--basedir",
75                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
76     parser.add_argument("--outdir", default=os.path.abspath('.'),
77                         help="Output directory, default current directory")
78
79     parser.add_argument("--eval-timeout",
80                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
81                         type=float,
82                         default=20)
83
84     exgroup = parser.add_mutually_exclusive_group()
85     exgroup.add_argument("--print-dot", action="store_true",
86                          help="Print workflow visualization in graphviz format and exit")
87     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
88     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
89
90     exgroup = parser.add_mutually_exclusive_group()
91     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
92     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
93     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
94
95     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
96
97     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
98
99     exgroup = parser.add_mutually_exclusive_group()
100     exgroup.add_argument("--enable-reuse", action="store_true",
101                         default=True, dest="enable_reuse",
102                         help="Enable container reuse (default)")
103     exgroup.add_argument("--disable-reuse", action="store_false",
104                         default=True, dest="enable_reuse",
105                         help="Disable container reuse")
106
107     parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
108     parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
109     parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
110     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
111                         help="Ignore Docker image version when deciding whether to reuse past containers.",
112                         default=False)
113
114     exgroup = parser.add_mutually_exclusive_group()
115     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
116                         default=True, dest="submit")
117     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
118                         default=True, dest="submit")
119     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
120                          dest="create_workflow")
121     exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
122     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
123
124     exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
125
126     exgroup = parser.add_mutually_exclusive_group()
127     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
128                         default=True, dest="wait")
129     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
130                         default=True, dest="wait")
131
132     exgroup = parser.add_mutually_exclusive_group()
133     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
134                         default=True, dest="log_timestamps")
135     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
136                         default=True, dest="log_timestamps")
137
138     parser.add_argument("--api",
139                         default=None, dest="work_api",
140                         choices=("containers",),
141                         help="Select work submission API.  Only supports 'containers'")
142
143     parser.add_argument("--compute-checksum", action="store_true", default=False,
144                         help="Compute checksum of contents while collecting outputs",
145                         dest="compute_checksum")
146
147     parser.add_argument("--submit-runner-ram", type=int,
148                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
149                         default=None)
150
151     parser.add_argument("--submit-runner-image",
152                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
153                         default=None)
154
155     parser.add_argument("--always-submit-runner", action="store_true",
156                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
157                         default=False)
158
159     parser.add_argument("--match-submitter-images", action="store_true",
160                         default=False, dest="match_local_docker",
161                         help="Where Arvados has more than one Docker image of the same name, use image from the Docker instance on the submitting node.")
162
163     exgroup = parser.add_mutually_exclusive_group()
164     exgroup.add_argument("--submit-request-uuid",
165                          default=None,
166                          help="Update and commit to supplied container request instead of creating a new one.",
167                          metavar="UUID")
168     exgroup.add_argument("--submit-runner-cluster",
169                          help="Submit workflow runner to a remote cluster",
170                          default=None,
171                          metavar="CLUSTER_ID")
172
173     parser.add_argument("--collection-cache-size", type=int,
174                         default=None,
175                         help="Collection cache size (in MiB, default 256).")
176
177     parser.add_argument("--name",
178                         help="Name to use for workflow execution instance.",
179                         default=None)
180
181     parser.add_argument("--on-error",
182                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
183                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
184                         default="continue", choices=("stop", "continue"))
185
186     parser.add_argument("--enable-dev", action="store_true",
187                         help="Enable loading and running development versions "
188                              "of the CWL standards.", default=False)
189     parser.add_argument('--storage-classes', default="default",
190                         help="Specify comma separated list of storage classes to be used when saving final workflow output to Keep.")
191     parser.add_argument('--intermediate-storage-classes', default="default",
192                         help="Specify comma separated list of storage classes to be used when saving intermediate workflow output to Keep.")
193
194     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
195                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
196                         default=0)
197
198     parser.add_argument("--priority", type=int,
199                         help="Workflow priority (range 1..1000, higher has precedence over lower)",
200                         default=DEFAULT_PRIORITY)
201
202     parser.add_argument("--disable-validate", dest="do_validate",
203                         action="store_false", default=True,
204                         help=argparse.SUPPRESS)
205
206     parser.add_argument("--disable-git", dest="git_info",
207                         action="store_false", default=True,
208                         help=argparse.SUPPRESS)
209
210     parser.add_argument("--disable-color", dest="enable_color",
211                         action="store_false", default=True,
212                         help=argparse.SUPPRESS)
213
214     parser.add_argument("--disable-js-validation",
215                         action="store_true", default=False,
216                         help=argparse.SUPPRESS)
217
218     parser.add_argument("--fast-parser", dest="fast_parser",
219                         action="store_true", default=False,
220                         help=argparse.SUPPRESS)
221
222     parser.add_argument("--thread-count", type=int,
223                         default=0, help="Number of threads to use for job submit and output collection.")
224
225     parser.add_argument("--http-timeout", type=int,
226                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
227
228     parser.add_argument("--defer-downloads", action="store_true", default=False,
229                         help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
230
231     parser.add_argument("--varying-url-params", type=str, default="",
232                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
233
234     parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
235                         help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
236
237     exgroup = parser.add_mutually_exclusive_group()
238     exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
239     exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
240
241     exgroup = parser.add_mutually_exclusive_group()
242     exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
243     exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
244
245     parser.add_argument(
246         "--skip-schemas",
247         action="store_true",
248         help="Skip loading of schemas",
249         default=False,
250         dest="skip_schemas",
251     )
252
253     exgroup = parser.add_mutually_exclusive_group()
254     exgroup.add_argument("--trash-intermediate", action="store_true",
255                         default=False, dest="trash_intermediate",
256                          help="Immediately trash intermediate outputs on workflow success.")
257     exgroup.add_argument("--no-trash-intermediate", action="store_false",
258                         default=False, dest="trash_intermediate",
259                         help="Do not trash intermediate outputs (default).")
260
261     exgroup = parser.add_mutually_exclusive_group()
262     exgroup.add_argument("--enable-usage-report", dest="enable_usage_report", default=None, action="store_true", help="Create usage_report.html with a summary of each step's resource usage.")
263     exgroup.add_argument("--disable-usage-report", dest="enable_usage_report", default=None, action="store_false", help="Disable usage report.")
264
265     parser.add_argument("workflow", default=None, help="The workflow to execute")
266     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
267
268     return parser
269
270 def add_arv_hints():
271     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
272     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
273     supported_versions = ["v1.0", "v1.1", "v1.2"]
274     for s in supported_versions:
275         customschema = importlib.resources.read_text(__name__, f'arv-cwl-schema-{s}.yml', 'utf-8')
276         use_custom_schema(s, "http://arvados.org/cwl", customschema)
277     cwltool.process.supportedProcessRequirements.extend([
278         "http://arvados.org/cwl#RunInSingleContainer",
279         "http://arvados.org/cwl#OutputDirType",
280         "http://arvados.org/cwl#RuntimeConstraints",
281         "http://arvados.org/cwl#PartitionRequirement",
282         "http://arvados.org/cwl#APIRequirement",
283         "http://commonwl.org/cwltool#LoadListingRequirement",
284         "http://arvados.org/cwl#IntermediateOutput",
285         "http://arvados.org/cwl#ReuseRequirement",
286         "http://arvados.org/cwl#ClusterTarget",
287         "http://arvados.org/cwl#OutputStorageClass",
288         "http://arvados.org/cwl#ProcessProperties",
289         "http://commonwl.org/cwltool#CUDARequirement",
290         "http://arvados.org/cwl#UsePreemptible",
291         "http://arvados.org/cwl#OutputCollectionProperties",
292         "http://arvados.org/cwl#KeepCacheTypeRequirement",
293         "http://arvados.org/cwl#OutOfMemoryRetry",
294     ])
295
296 def exit_signal_handler(sigcode, frame):
297     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
298     sys.exit(-sigcode)
299
300 def main(args=sys.argv[1:],
301          stdout=sys.stdout,
302          stderr=sys.stderr,
303          api_client=None,
304          keep_client=None,
305          install_sig_handlers=True):
306     parser = arg_parser()
307
308     job_order_object = None
309     arvargs = parser.parse_args(args)
310
311     arvargs.use_container = True
312     arvargs.relax_path_checks = True
313     arvargs.print_supported_versions = False
314
315     if install_sig_handlers:
316         arv_cmd.install_signal_handlers()
317
318     if arvargs.update_workflow:
319         if arvargs.update_workflow.find('-7fd4e-') == 5:
320             want_api = 'containers'
321         else:
322             want_api = None
323         if want_api and arvargs.work_api and want_api != arvargs.work_api:
324             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
325                 arvargs.update_workflow, want_api, arvargs.work_api))
326             return 1
327         arvargs.work_api = want_api
328
329     workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
330
331     if workflow_op and not arvargs.job_order:
332         job_order_object = ({}, "")
333
334     add_arv_hints()
335
336     for key, val in viewitems(cwltool.argparser.get_default_args()):
337         if not hasattr(arvargs, key):
338             setattr(arvargs, key, val)
339
340     try:
341         if api_client is None:
342             api_client = arvados.safeapi.ThreadSafeApiCache(
343                 api_params={
344                     'num_retries': arvargs.retries,
345                     'timeout': arvargs.http_timeout,
346                 },
347                 keep_params={
348                     'num_retries': arvargs.retries,
349                 },
350                 version='v1',
351             )
352             keep_client = api_client.keep
353             # Make an API object now so errors are reported early.
354             api_client.users().current().execute()
355         if keep_client is None:
356             block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
357             keep_client = arvados.keep.KeepClient(
358                 api_client=api_client,
359                 block_cache=block_cache,
360                 num_retries=arvargs.retries,
361             )
362         executor = ArvCwlExecutor(
363             api_client,
364             arvargs,
365             keep_client=keep_client,
366             num_retries=arvargs.retries,
367             stdout=stdout,
368         )
369     except WorkflowException as e:
370         logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
371         return 1
372     except Exception:
373         logger.exception("Error creating the Arvados CWL Executor")
374         return 1
375
376     # Note that unless in debug mode, some stack traces related to user
377     # workflow errors may be suppressed.
378
379     # Set the logging on most modules INFO (instead of default which is WARNING)
380     logger.setLevel(logging.INFO)
381     logging.getLogger('arvados').setLevel(logging.INFO)
382     logging.getLogger('arvados.keep').setLevel(logging.WARNING)
383     # API retries are filtered to the INFO level and can be noisy, but as long as
384     # they succeed we don't need to see warnings about it.
385     googleapiclient_http_logger = logging.getLogger('googleapiclient.http')
386     googleapiclient_http_logger.addFilter(arvados.logging.GoogleHTTPClientFilter())
387     googleapiclient_http_logger.setLevel(logging.WARNING)
388
389     if arvargs.debug:
390         logger.setLevel(logging.DEBUG)
391         logging.getLogger('arvados').setLevel(logging.DEBUG)
392         # In debug mode show logs about retries, but we arn't
393         # debugging the google client so we don't need to see
394         # everything.
395         googleapiclient_http_logger.setLevel(logging.NOTSET)
396         logging.getLogger('googleapiclient').setLevel(logging.INFO)
397
398     if arvargs.quiet:
399         logger.setLevel(logging.WARN)
400         logging.getLogger('arvados').setLevel(logging.WARN)
401         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
402
403     if arvargs.metrics:
404         metrics.setLevel(logging.DEBUG)
405         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
406
407     if arvargs.log_timestamps:
408         arvados.log_handler.setFormatter(logging.Formatter(
409             '%(asctime)s %(name)s %(levelname)s: %(message)s',
410             '%Y-%m-%d %H:%M:%S'))
411     else:
412         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
413
414     if stdout is sys.stdout:
415         # cwltool.main has code to work around encoding issues with
416         # sys.stdout and unix pipes (they default to ASCII encoding,
417         # we want utf-8), so when stdout is sys.stdout set it to None
418         # to take advantage of that.  Don't override it for all cases
419         # since we still want to be able to capture stdout for the
420         # unit tests.
421         stdout = None
422
423     executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
424
425     if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
426         executor.loadingContext.do_validate = False
427         if arvargs.submit and not workflow_op:
428             executor.fast_submit = True
429
430     return cwltool.main.main(args=arvargs,
431                              stdout=stdout,
432                              stderr=stderr,
433                              executor=executor.arv_executor,
434                              versionfunc=versionstring,
435                              job_order_object=job_order_object,
436                              logger_handler=arvados.log_handler,
437                              custom_schema_callback=add_arv_hints,
438                              loadingContext=executor.loadingContext,
439                              runtimeContext=executor.toplevel_runtimeContext,
440                              input_required=not workflow_op)