13306: test_submit additional fixes for py2 unicode support
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 from builtins import str
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import re
16 import pkg_resources  # part of setuptools
17
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20 import cwltool.main
21 import cwltool.workflow
22 import cwltool.process
23 import cwltool.argparser
24 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
25 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31 import arvados.commands._util as arv_cmd
32 from arvados.api import OrderedJsonModel
33
34 from .perf import Perf
35 from ._version import __version__
36 from .executor import ArvCwlExecutor
37
38 # These arn't used directly in this file but
39 # other code expects to import them from here
40 from .arvcontainer import ArvadosContainer
41 from .arvjob import ArvadosJob
42 from .arvtool import ArvadosCommandTool
43 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
44 from .util import get_current_container
45 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
46 from .arvworkflow import ArvadosWorkflow
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
51
52 arvados.log_handler.setFormatter(logging.Formatter(
53         '%(asctime)s %(name)s %(levelname)s: %(message)s',
54         '%Y-%m-%d %H:%M:%S'))
55
56 def versionstring():
57     """Print version string of key packages for provenance and debugging."""
58
59     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
60     arvpkg = pkg_resources.require("arvados-python-client")
61     cwlpkg = pkg_resources.require("cwltool")
62
63     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
64                                     "arvados-python-client", arvpkg[0].version,
65                                     "cwltool", cwlpkg[0].version)
66
67
68 def arg_parser():  # type: () -> argparse.ArgumentParser
69     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
70
71     parser.add_argument("--basedir", type=str,
72                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
73     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
74                         help="Output directory, default current directory")
75
76     parser.add_argument("--eval-timeout",
77                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
78                         type=float,
79                         default=20)
80
81     exgroup = parser.add_mutually_exclusive_group()
82     exgroup.add_argument("--print-dot", action="store_true",
83                          help="Print workflow visualization in graphviz format and exit")
84     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
85     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
86
87     exgroup = parser.add_mutually_exclusive_group()
88     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
89     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
90     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
91
92     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
93
94     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
95
96     exgroup = parser.add_mutually_exclusive_group()
97     exgroup.add_argument("--enable-reuse", action="store_true",
98                         default=True, dest="enable_reuse",
99                         help="Enable job or container reuse (default)")
100     exgroup.add_argument("--disable-reuse", action="store_false",
101                         default=True, dest="enable_reuse",
102                         help="Disable job or container reuse")
103
104     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
105     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
106     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
107     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
108                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
109                         default=False)
110
111     exgroup = parser.add_mutually_exclusive_group()
112     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
113                         default=True, dest="submit")
114     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
115                         default=True, dest="submit")
116     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
117                          dest="create_workflow")
118     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
119     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
120
121     exgroup = parser.add_mutually_exclusive_group()
122     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
123                         default=True, dest="wait")
124     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
125                         default=True, dest="wait")
126
127     exgroup = parser.add_mutually_exclusive_group()
128     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
129                         default=True, dest="log_timestamps")
130     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
131                         default=True, dest="log_timestamps")
132
133     parser.add_argument("--api", type=str,
134                         default=None, dest="work_api",
135                         choices=("jobs", "containers"),
136                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
137
138     parser.add_argument("--compute-checksum", action="store_true", default=False,
139                         help="Compute checksum of contents while collecting outputs",
140                         dest="compute_checksum")
141
142     parser.add_argument("--submit-runner-ram", type=int,
143                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
144                         default=None)
145
146     parser.add_argument("--submit-runner-image", type=str,
147                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148                         default=None)
149
150     parser.add_argument("--always-submit-runner", action="store_true",
151                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
152                         default=False)
153
154     exgroup = parser.add_mutually_exclusive_group()
155     exgroup.add_argument("--submit-request-uuid", type=str,
156                          default=None,
157                          help="Update and commit to supplied container request instead of creating a new one (containers API only).",
158                          metavar="UUID")
159     exgroup.add_argument("--submit-runner-cluster", type=str,
160                          help="Submit workflow runner to a remote cluster (containers API only)",
161                          default=None,
162                          metavar="CLUSTER_ID")
163
164     parser.add_argument("--collection-cache-size", type=int,
165                         default=None,
166                         help="Collection cache size (in MiB, default 256).")
167
168     parser.add_argument("--name", type=str,
169                         help="Name to use for workflow execution instance.",
170                         default=None)
171
172     parser.add_argument("--on-error",
173                         help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
174                         "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
175                         default="continue", choices=("stop", "continue"))
176
177     parser.add_argument("--enable-dev", action="store_true",
178                         help="Enable loading and running development versions "
179                              "of CWL spec.", default=False)
180     parser.add_argument('--storage-classes', default="default", type=str,
181                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
182
183     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
184                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
185                         default=0)
186
187     parser.add_argument("--priority", type=int,
188                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
189                         default=DEFAULT_PRIORITY)
190
191     parser.add_argument("--disable-validate", dest="do_validate",
192                         action="store_false", default=True,
193                         help=argparse.SUPPRESS)
194
195     parser.add_argument("--disable-js-validation",
196                         action="store_true", default=False,
197                         help=argparse.SUPPRESS)
198
199     parser.add_argument("--thread-count", type=int,
200                         default=1, help="Number of threads to use for job submit and output collection.")
201
202     parser.add_argument("--http-timeout", type=int,
203                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
204
205     exgroup = parser.add_mutually_exclusive_group()
206     exgroup.add_argument("--trash-intermediate", action="store_true",
207                         default=False, dest="trash_intermediate",
208                          help="Immediately trash intermediate outputs on workflow success.")
209     exgroup.add_argument("--no-trash-intermediate", action="store_false",
210                         default=False, dest="trash_intermediate",
211                         help="Do not trash intermediate outputs (default).")
212
213     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
214     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
215
216     return parser
217
218 def add_arv_hints():
219     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
220     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
221     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
222     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
223     res.close()
224     cwltool.process.supportedProcessRequirements.extend([
225         "http://arvados.org/cwl#RunInSingleContainer",
226         "http://arvados.org/cwl#OutputDirType",
227         "http://arvados.org/cwl#RuntimeConstraints",
228         "http://arvados.org/cwl#PartitionRequirement",
229         "http://arvados.org/cwl#APIRequirement",
230         "http://commonwl.org/cwltool#LoadListingRequirement",
231         "http://arvados.org/cwl#IntermediateOutput",
232         "http://arvados.org/cwl#ReuseRequirement",
233         "http://arvados.org/cwl#ClusterTarget"
234     ])
235
236 def exit_signal_handler(sigcode, frame):
237     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
238     sys.exit(-sigcode)
239
240 def main(args, stdout, stderr, api_client=None, keep_client=None,
241          install_sig_handlers=True):
242     parser = arg_parser()
243
244     job_order_object = None
245     arvargs = parser.parse_args(args)
246
247     if len(arvargs.storage_classes.strip().split(',')) > 1:
248         logger.error(str(u"Multiple storage classes are not supported currently."))
249         return 1
250
251     arvargs.use_container = True
252     arvargs.relax_path_checks = True
253     arvargs.print_supported_versions = False
254
255     if install_sig_handlers:
256         arv_cmd.install_signal_handlers()
257
258     if arvargs.update_workflow:
259         if arvargs.update_workflow.find('-7fd4e-') == 5:
260             want_api = 'containers'
261         elif arvargs.update_workflow.find('-p5p6p-') == 5:
262             want_api = 'jobs'
263         else:
264             want_api = None
265         if want_api and arvargs.work_api and want_api != arvargs.work_api:
266             logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
267                 arvargs.update_workflow, want_api, arvargs.work_api))
268             return 1
269         arvargs.work_api = want_api
270
271     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
272         job_order_object = ({}, "")
273
274     add_arv_hints()
275
276     for key, val in list(cwltool.argparser.get_default_args().items()):
277         if not hasattr(arvargs, key):
278             setattr(arvargs, key, val)
279
280     try:
281         if api_client is None:
282             api_client = arvados.safeapi.ThreadSafeApiCache(
283                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
284                 keep_params={"num_retries": 4})
285             keep_client = api_client.keep
286             # Make an API object now so errors are reported early.
287             api_client.users().current().execute()
288         if keep_client is None:
289             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
290         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
291     except Exception as e:
292         logger.error(e)
293         return 1
294
295     if arvargs.debug:
296         logger.setLevel(logging.DEBUG)
297         logging.getLogger('arvados').setLevel(logging.DEBUG)
298
299     if arvargs.quiet:
300         logger.setLevel(logging.WARN)
301         logging.getLogger('arvados').setLevel(logging.WARN)
302         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
303
304     if arvargs.metrics:
305         metrics.setLevel(logging.DEBUG)
306         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
307
308     if arvargs.log_timestamps:
309         arvados.log_handler.setFormatter(logging.Formatter(
310             '%(asctime)s %(name)s %(levelname)s: %(message)s',
311             '%Y-%m-%d %H:%M:%S'))
312     else:
313         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
314
315     return cwltool.main.main(args=arvargs,
316                              stdout=stdout,
317                              stderr=stderr,
318                              executor=executor.arv_executor,
319                              versionfunc=versionstring,
320                              job_order_object=job_order_object,
321                              logger_handler=arvados.log_handler,
322                              custom_schema_callback=add_arv_hints,
323                              loadingContext=executor.loadingContext,
324                              runtimeContext=executor.runtimeContext)