2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
9 from future.utils import viewitems
10 from builtins import str
17 import pkg_resources # part of setuptools
19 from schema_salad.sourceline import SourceLine
20 import schema_salad.validate as validate
22 import cwltool.workflow
23 import cwltool.process
24 import cwltool.argparser
25 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
26 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32 import arvados.commands._util as arv_cmd
33 from arvados.api import OrderedJsonModel
35 from .perf import Perf
36 from ._version import __version__
37 from .executor import ArvCwlExecutor
39 # These arn't used directly in this file but
40 # other code expects to import them from here
41 from .arvcontainer import ArvadosContainer
42 from .arvjob import ArvadosJob
43 from .arvtool import ArvadosCommandTool
44 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
45 from .util import get_current_container
46 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
47 from .arvworkflow import ArvadosWorkflow
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
58 """Print version string of key packages for provenance and debugging."""
60 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
61 arvpkg = pkg_resources.require("arvados-python-client")
62 cwlpkg = pkg_resources.require("cwltool")
64 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
65 "arvados-python-client", arvpkg[0].version,
66 "cwltool", cwlpkg[0].version)
69 def arg_parser(): # type: () -> argparse.ArgumentParser
70 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
72 parser.add_argument("--basedir",
73 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
74 parser.add_argument("--outdir", default=os.path.abspath('.'),
75 help="Output directory, default current directory")
77 parser.add_argument("--eval-timeout",
78 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
82 exgroup = parser.add_mutually_exclusive_group()
83 exgroup.add_argument("--print-dot", action="store_true",
84 help="Print workflow visualization in graphviz format and exit")
85 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
86 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
88 exgroup = parser.add_mutually_exclusive_group()
89 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
90 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
91 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
93 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
95 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
97 exgroup = parser.add_mutually_exclusive_group()
98 exgroup.add_argument("--enable-reuse", action="store_true",
99 default=True, dest="enable_reuse",
100 help="Enable job or container reuse (default)")
101 exgroup.add_argument("--disable-reuse", action="store_false",
102 default=True, dest="enable_reuse",
103 help="Disable job or container reuse")
105 parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
106 parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
107 parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
108 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
109 help="Ignore Docker image version when deciding whether to reuse past jobs.",
112 exgroup = parser.add_mutually_exclusive_group()
113 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
114 default=True, dest="submit")
115 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
116 default=True, dest="submit")
117 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
118 dest="create_workflow")
119 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
120 exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
122 exgroup = parser.add_mutually_exclusive_group()
123 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
124 default=True, dest="wait")
125 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
126 default=True, dest="wait")
128 exgroup = parser.add_mutually_exclusive_group()
129 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
130 default=True, dest="log_timestamps")
131 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
132 default=True, dest="log_timestamps")
134 parser.add_argument("--api",
135 default=None, dest="work_api",
136 choices=("jobs", "containers"),
137 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
139 parser.add_argument("--compute-checksum", action="store_true", default=False,
140 help="Compute checksum of contents while collecting outputs",
141 dest="compute_checksum")
143 parser.add_argument("--submit-runner-ram", type=int,
144 help="RAM (in MiB) required for the workflow runner job (default 1024)",
147 parser.add_argument("--submit-runner-image",
148 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
151 parser.add_argument("--always-submit-runner", action="store_true",
152 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
155 exgroup = parser.add_mutually_exclusive_group()
156 exgroup.add_argument("--submit-request-uuid",
158 help="Update and commit to supplied container request instead of creating a new one (containers API only).",
160 exgroup.add_argument("--submit-runner-cluster",
161 help="Submit workflow runner to a remote cluster (containers API only)",
163 metavar="CLUSTER_ID")
165 parser.add_argument("--collection-cache-size", type=int,
167 help="Collection cache size (in MiB, default 256).")
169 parser.add_argument("--name",
170 help="Name to use for workflow execution instance.",
173 parser.add_argument("--on-error",
174 help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
175 "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
176 default="continue", choices=("stop", "continue"))
178 parser.add_argument("--enable-dev", action="store_true",
179 help="Enable loading and running development versions "
180 "of CWL spec.", default=False)
181 parser.add_argument('--storage-classes', default="default",
182 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
184 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
185 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
188 parser.add_argument("--priority", type=int,
189 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
190 default=DEFAULT_PRIORITY)
192 parser.add_argument("--disable-validate", dest="do_validate",
193 action="store_false", default=True,
194 help=argparse.SUPPRESS)
196 parser.add_argument("--disable-js-validation",
197 action="store_true", default=False,
198 help=argparse.SUPPRESS)
200 parser.add_argument("--thread-count", type=int,
201 default=1, help="Number of threads to use for job submit and output collection.")
203 parser.add_argument("--http-timeout", type=int,
204 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
206 exgroup = parser.add_mutually_exclusive_group()
207 exgroup.add_argument("--trash-intermediate", action="store_true",
208 default=False, dest="trash_intermediate",
209 help="Immediately trash intermediate outputs on workflow success.")
210 exgroup.add_argument("--no-trash-intermediate", action="store_false",
211 default=False, dest="trash_intermediate",
212 help="Do not trash intermediate outputs (default).")
214 parser.add_argument("workflow", default=None, help="The workflow to execute")
215 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
220 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
221 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
222 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
223 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
224 customschema = res.read()
225 use_custom_schema("v1.0", "http://arvados.org/cwl", customschema)
226 use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema)
228 cwltool.process.supportedProcessRequirements.extend([
229 "http://arvados.org/cwl#RunInSingleContainer",
230 "http://arvados.org/cwl#OutputDirType",
231 "http://arvados.org/cwl#RuntimeConstraints",
232 "http://arvados.org/cwl#PartitionRequirement",
233 "http://arvados.org/cwl#APIRequirement",
234 "http://commonwl.org/cwltool#LoadListingRequirement",
235 "http://arvados.org/cwl#IntermediateOutput",
236 "http://arvados.org/cwl#ReuseRequirement",
237 "http://arvados.org/cwl#ClusterTarget"
240 def exit_signal_handler(sigcode, frame):
241 logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
244 def main(args, stdout, stderr, api_client=None, keep_client=None,
245 install_sig_handlers=True):
246 parser = arg_parser()
248 job_order_object = None
249 arvargs = parser.parse_args(args)
251 if len(arvargs.storage_classes.strip().split(',')) > 1:
252 logger.error(str(u"Multiple storage classes are not supported currently."))
255 arvargs.use_container = True
256 arvargs.relax_path_checks = True
257 arvargs.print_supported_versions = False
259 if install_sig_handlers:
260 arv_cmd.install_signal_handlers()
262 if arvargs.update_workflow:
263 if arvargs.update_workflow.find('-7fd4e-') == 5:
264 want_api = 'containers'
265 elif arvargs.update_workflow.find('-p5p6p-') == 5:
269 if want_api and arvargs.work_api and want_api != arvargs.work_api:
270 logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
271 arvargs.update_workflow, want_api, arvargs.work_api))
273 arvargs.work_api = want_api
275 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
276 job_order_object = ({}, "")
280 for key, val in viewitems(cwltool.argparser.get_default_args()):
281 if not hasattr(arvargs, key):
282 setattr(arvargs, key, val)
285 if api_client is None:
286 api_client = arvados.safeapi.ThreadSafeApiCache(
287 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
288 keep_params={"num_retries": 4})
289 keep_client = api_client.keep
290 # Make an API object now so errors are reported early.
291 api_client.users().current().execute()
292 if keep_client is None:
293 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
294 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
296 logger.exception("Error creating the Arvados CWL Executor")
299 # Note that unless in debug mode, some stack traces related to user
300 # workflow errors may be suppressed. See ArvadosJob.done().
302 logger.setLevel(logging.DEBUG)
303 logging.getLogger('arvados').setLevel(logging.DEBUG)
306 logger.setLevel(logging.WARN)
307 logging.getLogger('arvados').setLevel(logging.WARN)
308 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
311 metrics.setLevel(logging.DEBUG)
312 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
314 if arvargs.log_timestamps:
315 arvados.log_handler.setFormatter(logging.Formatter(
316 '%(asctime)s %(name)s %(levelname)s: %(message)s',
317 '%Y-%m-%d %H:%M:%S'))
319 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
321 if stdout is sys.stdout:
322 # cwltool.main has code to work around encoding issues with
323 # sys.stdout and unix pipes (they default to ASCII encoding,
324 # we want utf-8), so when stdout is sys.stdout set it to None
325 # to take advantage of that. Don't override it for all cases
326 # since we still want to be able to capture stdout for the
330 return cwltool.main.main(args=arvargs,
333 executor=executor.arv_executor,
334 versionfunc=versionstring,
335 job_order_object=job_order_object,
336 logger_handler=arvados.log_handler,
337 custom_schema_callback=add_arv_hints,
338 loadingContext=executor.loadingContext,
339 runtimeContext=executor.runtimeContext)