2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
14 import pkg_resources # part of setuptools
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
55 """Print version string of key packages for provenance and debugging."""
57 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58 arvpkg = pkg_resources.require("arvados-python-client")
59 cwlpkg = pkg_resources.require("cwltool")
61 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62 "arvados-python-client", arvpkg[0].version,
63 "cwltool", cwlpkg[0].version)
66 def arg_parser(): # type: () -> argparse.ArgumentParser
67 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
69 parser.add_argument("--basedir", type=str,
70 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72 help="Output directory, default current directory")
74 parser.add_argument("--eval-timeout",
75 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79 exgroup = parser.add_mutually_exclusive_group()
80 exgroup.add_argument("--print-dot", action="store_true",
81 help="Print workflow visualization in graphviz format and exit")
82 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
85 exgroup = parser.add_mutually_exclusive_group()
86 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
90 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
92 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
94 exgroup = parser.add_mutually_exclusive_group()
95 exgroup.add_argument("--enable-reuse", action="store_true",
96 default=True, dest="enable_reuse",
97 help="Enable job or container reuse (default)")
98 exgroup.add_argument("--disable-reuse", action="store_false",
99 default=True, dest="enable_reuse",
100 help="Disable job or container reuse")
102 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106 help="Ignore Docker image version when deciding whether to reuse past jobs.",
109 exgroup = parser.add_mutually_exclusive_group()
110 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111 default=True, dest="submit")
112 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113 default=True, dest="submit")
114 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115 dest="create_workflow")
116 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
119 exgroup = parser.add_mutually_exclusive_group()
120 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121 default=True, dest="wait")
122 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123 default=True, dest="wait")
125 exgroup = parser.add_mutually_exclusive_group()
126 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127 default=True, dest="log_timestamps")
128 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129 default=True, dest="log_timestamps")
131 parser.add_argument("--api", type=str,
132 default=None, dest="work_api",
133 choices=("jobs", "containers"),
134 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
136 parser.add_argument("--compute-checksum", action="store_true", default=False,
137 help="Compute checksum of contents while collecting outputs",
138 dest="compute_checksum")
140 parser.add_argument("--submit-runner-ram", type=int,
141 help="RAM (in MiB) required for the workflow runner job (default 1024)",
144 parser.add_argument("--submit-runner-image", type=str,
145 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148 parser.add_argument("--always-submit-runner", action="store_true",
149 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
152 exgroup = parser.add_mutually_exclusive_group()
153 exgroup.add_argument("--submit-request-uuid", type=str,
155 help="Update and commit to supplied container request instead of creating a new one (containers API only).",
157 exgroup.add_argument("--submit-runner-cluster", type=str,
158 help="Submit workflow runner to a remote cluster (containers API only)",
160 metavar="CLUSTER_ID")
162 parser.add_argument("--name", type=str,
163 help="Name to use for workflow execution instance.",
166 parser.add_argument("--on-error", type=str,
167 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
168 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
170 parser.add_argument("--enable-dev", action="store_true",
171 help="Enable loading and running development versions "
172 "of CWL spec.", default=False)
173 parser.add_argument('--storage-classes', default="default", type=str,
174 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
176 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
177 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
180 parser.add_argument("--priority", type=int,
181 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
182 default=DEFAULT_PRIORITY)
184 parser.add_argument("--disable-validate", dest="do_validate",
185 action="store_false", default=True,
186 help=argparse.SUPPRESS)
188 parser.add_argument("--disable-js-validation",
189 action="store_true", default=False,
190 help=argparse.SUPPRESS)
192 parser.add_argument("--thread-count", type=int,
193 default=4, help="Number of threads to use for job submit and output collection.")
195 parser.add_argument("--http-timeout", type=int,
196 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
198 exgroup = parser.add_mutually_exclusive_group()
199 exgroup.add_argument("--trash-intermediate", action="store_true",
200 default=False, dest="trash_intermediate",
201 help="Immediately trash intermediate outputs on workflow success.")
202 exgroup.add_argument("--no-trash-intermediate", action="store_false",
203 default=False, dest="trash_intermediate",
204 help="Do not trash intermediate outputs (default).")
206 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
207 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
212 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
213 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
214 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
215 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
217 cwltool.process.supportedProcessRequirements.extend([
218 "http://arvados.org/cwl#RunInSingleContainer",
219 "http://arvados.org/cwl#OutputDirType",
220 "http://arvados.org/cwl#RuntimeConstraints",
221 "http://arvados.org/cwl#PartitionRequirement",
222 "http://arvados.org/cwl#APIRequirement",
223 "http://commonwl.org/cwltool#LoadListingRequirement",
224 "http://arvados.org/cwl#IntermediateOutput",
225 "http://arvados.org/cwl#ReuseRequirement",
226 "http://arvados.org/cwl#ClusterTarget"
229 def exit_signal_handler(sigcode, frame):
230 logger.error("Caught signal {}, exiting.".format(sigcode))
233 def main(args, stdout, stderr, api_client=None, keep_client=None,
234 install_sig_handlers=True):
235 parser = arg_parser()
237 job_order_object = None
238 arvargs = parser.parse_args(args)
240 if len(arvargs.storage_classes.strip().split(',')) > 1:
241 logger.error("Multiple storage classes are not supported currently.")
244 arvargs.use_container = True
245 arvargs.relax_path_checks = True
246 arvargs.print_supported_versions = False
248 if install_sig_handlers:
249 arv_cmd.install_signal_handlers()
251 if arvargs.update_workflow:
252 if arvargs.update_workflow.find('-7fd4e-') == 5:
253 want_api = 'containers'
254 elif arvargs.update_workflow.find('-p5p6p-') == 5:
258 if want_api and arvargs.work_api and want_api != arvargs.work_api:
259 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
260 arvargs.update_workflow, want_api, arvargs.work_api))
262 arvargs.work_api = want_api
264 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
265 job_order_object = ({}, "")
269 for key, val in cwltool.argparser.get_default_args().items():
270 if not hasattr(arvargs, key):
271 setattr(arvargs, key, val)
274 if api_client is None:
275 api_client = arvados.safeapi.ThreadSafeApiCache(
276 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
277 keep_params={"num_retries": 4})
278 keep_client = api_client.keep
279 # Make an API object now so errors are reported early.
280 api_client.users().current().execute()
281 if keep_client is None:
282 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
283 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
284 except Exception as e:
289 logger.setLevel(logging.DEBUG)
290 logging.getLogger('arvados').setLevel(logging.DEBUG)
293 logger.setLevel(logging.WARN)
294 logging.getLogger('arvados').setLevel(logging.WARN)
295 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
298 metrics.setLevel(logging.DEBUG)
299 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
301 if arvargs.log_timestamps:
302 arvados.log_handler.setFormatter(logging.Formatter(
303 '%(asctime)s %(name)s %(levelname)s: %(message)s',
304 '%Y-%m-%d %H:%M:%S'))
306 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
308 return cwltool.main.main(args=arvargs,
311 executor=executor.arv_executor,
312 versionfunc=versionstring,
313 job_order_object=job_order_object,
314 logger_handler=arvados.log_handler,
315 custom_schema_callback=add_arv_hints,
316 loadingContext=executor.loadingContext,
317 runtimeContext=executor.runtimeContext)