2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
14 import pkg_resources # part of setuptools
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
55 """Print version string of key packages for provenance and debugging."""
57 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58 arvpkg = pkg_resources.require("arvados-python-client")
59 cwlpkg = pkg_resources.require("cwltool")
61 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62 "arvados-python-client", arvpkg[0].version,
63 "cwltool", cwlpkg[0].version)
66 def arg_parser(): # type: () -> argparse.ArgumentParser
67 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
69 parser.add_argument("--basedir", type=str,
70 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72 help="Output directory, default current directory")
74 parser.add_argument("--eval-timeout",
75 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79 exgroup = parser.add_mutually_exclusive_group()
80 exgroup.add_argument("--print-dot", action="store_true",
81 help="Print workflow visualization in graphviz format and exit")
82 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
85 exgroup = parser.add_mutually_exclusive_group()
86 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
90 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
92 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
94 exgroup = parser.add_mutually_exclusive_group()
95 exgroup.add_argument("--enable-reuse", action="store_true",
96 default=True, dest="enable_reuse",
97 help="Enable job or container reuse (default)")
98 exgroup.add_argument("--disable-reuse", action="store_false",
99 default=True, dest="enable_reuse",
100 help="Disable job or container reuse")
102 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106 help="Ignore Docker image version when deciding whether to reuse past jobs.",
109 exgroup = parser.add_mutually_exclusive_group()
110 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111 default=True, dest="submit")
112 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113 default=True, dest="submit")
114 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115 dest="create_workflow")
116 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
119 exgroup = parser.add_mutually_exclusive_group()
120 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121 default=True, dest="wait")
122 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123 default=True, dest="wait")
125 exgroup = parser.add_mutually_exclusive_group()
126 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127 default=True, dest="log_timestamps")
128 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129 default=True, dest="log_timestamps")
131 parser.add_argument("--api", type=str,
132 default=None, dest="work_api",
133 choices=("jobs", "containers"),
134 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
136 parser.add_argument("--compute-checksum", action="store_true", default=False,
137 help="Compute checksum of contents while collecting outputs",
138 dest="compute_checksum")
140 parser.add_argument("--submit-runner-ram", type=int,
141 help="RAM (in MiB) required for the workflow runner job (default 1024)",
144 parser.add_argument("--submit-runner-image", type=str,
145 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148 parser.add_argument("--always-submit-runner", action="store_true",
149 help="Always submit a runner to manage the workflow, even when running only a single CommandLineTool",
152 exgroup = parser.add_mutually_exclusive_group()
153 exgroup.add_argument("--submit-request-uuid", type=str,
155 help="Update and commit to supplied container request instead of creating a new one (containers API only).")
156 exgroup.add_argument("--submit-runner-cluster", type=str,
157 help="Submit toplevel runner to a remote cluster (containers API only)",
160 parser.add_argument("--name", type=str,
161 help="Name to use for workflow execution instance.",
164 parser.add_argument("--on-error", type=str,
165 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
166 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
168 parser.add_argument("--enable-dev", action="store_true",
169 help="Enable loading and running development versions "
170 "of CWL spec.", default=False)
171 parser.add_argument('--storage-classes', default="default", type=str,
172 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
174 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
175 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
178 parser.add_argument("--priority", type=int,
179 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
180 default=DEFAULT_PRIORITY)
182 parser.add_argument("--disable-validate", dest="do_validate",
183 action="store_false", default=True,
184 help=argparse.SUPPRESS)
186 parser.add_argument("--disable-js-validation",
187 action="store_true", default=False,
188 help=argparse.SUPPRESS)
190 parser.add_argument("--thread-count", type=int,
191 default=4, help="Number of threads to use for job submit and output collection.")
193 parser.add_argument("--http-timeout", type=int,
194 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
196 exgroup = parser.add_mutually_exclusive_group()
197 exgroup.add_argument("--trash-intermediate", action="store_true",
198 default=False, dest="trash_intermediate",
199 help="Immediately trash intermediate outputs on workflow success.")
200 exgroup.add_argument("--no-trash-intermediate", action="store_false",
201 default=False, dest="trash_intermediate",
202 help="Do not trash intermediate outputs (default).")
204 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
205 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
210 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
211 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
212 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
213 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
215 cwltool.process.supportedProcessRequirements.extend([
216 "http://arvados.org/cwl#RunInSingleContainer",
217 "http://arvados.org/cwl#OutputDirType",
218 "http://arvados.org/cwl#RuntimeConstraints",
219 "http://arvados.org/cwl#PartitionRequirement",
220 "http://arvados.org/cwl#APIRequirement",
221 "http://commonwl.org/cwltool#LoadListingRequirement",
222 "http://arvados.org/cwl#IntermediateOutput",
223 "http://arvados.org/cwl#ReuseRequirement",
224 "http://arvados.org/cwl#ClusterTarget"
227 def exit_signal_handler(sigcode, frame):
228 logger.error("Caught signal {}, exiting.".format(sigcode))
231 def main(args, stdout, stderr, api_client=None, keep_client=None,
232 install_sig_handlers=True):
233 parser = arg_parser()
235 job_order_object = None
236 arvargs = parser.parse_args(args)
238 if len(arvargs.storage_classes.strip().split(',')) > 1:
239 logger.error("Multiple storage classes are not supported currently.")
242 arvargs.use_container = True
243 arvargs.relax_path_checks = True
244 arvargs.print_supported_versions = False
246 if install_sig_handlers:
247 arv_cmd.install_signal_handlers()
249 if arvargs.update_workflow:
250 if arvargs.update_workflow.find('-7fd4e-') == 5:
251 want_api = 'containers'
252 elif arvargs.update_workflow.find('-p5p6p-') == 5:
256 if want_api and arvargs.work_api and want_api != arvargs.work_api:
257 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
258 arvargs.update_workflow, want_api, arvargs.work_api))
260 arvargs.work_api = want_api
262 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
263 job_order_object = ({}, "")
267 for key, val in cwltool.argparser.get_default_args().items():
268 if not hasattr(arvargs, key):
269 setattr(arvargs, key, val)
272 if api_client is None:
273 api_client = arvados.safeapi.ThreadSafeApiCache(
274 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
275 keep_params={"num_retries": 4})
276 keep_client = api_client.keep
277 # Make an API object now so errors are reported early.
278 api_client.users().current().execute()
279 if keep_client is None:
280 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
281 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
282 except Exception as e:
287 logger.setLevel(logging.DEBUG)
288 logging.getLogger('arvados').setLevel(logging.DEBUG)
291 logger.setLevel(logging.WARN)
292 logging.getLogger('arvados').setLevel(logging.WARN)
293 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
296 metrics.setLevel(logging.DEBUG)
297 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
299 if arvargs.log_timestamps:
300 arvados.log_handler.setFormatter(logging.Formatter(
301 '%(asctime)s %(name)s %(levelname)s: %(message)s',
302 '%Y-%m-%d %H:%M:%S'))
304 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
306 return cwltool.main.main(args=arvargs,
309 executor=executor.arv_executor,
310 versionfunc=versionstring,
311 job_order_object=job_order_object,
312 logger_handler=arvados.log_handler,
313 custom_schema_callback=add_arv_hints,
314 loadingContext=executor.loadingContext,
315 runtimeContext=executor.runtimeContext)