2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
14 import pkg_resources # part of setuptools
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
55 """Print version string of key packages for provenance and debugging."""
57 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58 arvpkg = pkg_resources.require("arvados-python-client")
59 cwlpkg = pkg_resources.require("cwltool")
61 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62 "arvados-python-client", arvpkg[0].version,
63 "cwltool", cwlpkg[0].version)
66 def arg_parser(): # type: () -> argparse.ArgumentParser
67 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
69 parser.add_argument("--basedir", type=str,
70 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72 help="Output directory, default current directory")
74 parser.add_argument("--eval-timeout",
75 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79 exgroup = parser.add_mutually_exclusive_group()
80 exgroup.add_argument("--print-dot", action="store_true",
81 help="Print workflow visualization in graphviz format and exit")
82 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
85 exgroup = parser.add_mutually_exclusive_group()
86 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
90 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
92 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
94 exgroup = parser.add_mutually_exclusive_group()
95 exgroup.add_argument("--enable-reuse", action="store_true",
96 default=True, dest="enable_reuse",
97 help="Enable job or container reuse (default)")
98 exgroup.add_argument("--disable-reuse", action="store_false",
99 default=True, dest="enable_reuse",
100 help="Disable job or container reuse")
102 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106 help="Ignore Docker image version when deciding whether to reuse past jobs.",
109 exgroup = parser.add_mutually_exclusive_group()
110 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111 default=True, dest="submit")
112 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113 default=True, dest="submit")
114 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115 dest="create_workflow")
116 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
119 exgroup = parser.add_mutually_exclusive_group()
120 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121 default=True, dest="wait")
122 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123 default=True, dest="wait")
125 exgroup = parser.add_mutually_exclusive_group()
126 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127 default=True, dest="log_timestamps")
128 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129 default=True, dest="log_timestamps")
131 parser.add_argument("--api", type=str,
132 default=None, dest="work_api",
133 choices=("jobs", "containers"),
134 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
136 parser.add_argument("--compute-checksum", action="store_true", default=False,
137 help="Compute checksum of contents while collecting outputs",
138 dest="compute_checksum")
140 parser.add_argument("--submit-runner-ram", type=int,
141 help="RAM (in MiB) required for the workflow runner job (default 1024)",
144 parser.add_argument("--submit-runner-image", type=str,
145 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148 parser.add_argument("--always-submit-runner", action="store_true",
149 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
152 exgroup = parser.add_mutually_exclusive_group()
153 exgroup.add_argument("--submit-request-uuid", type=str,
155 help="Update and commit to supplied container request instead of creating a new one (containers API only).",
157 exgroup.add_argument("--submit-runner-cluster", type=str,
158 help="Submit workflow runner to a remote cluster (containers API only)",
160 metavar="CLUSTER_ID")
162 parser.add_argument("--name", type=str,
163 help="Name to use for workflow execution instance.",
166 parser.add_argument("--on-error",
167 help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
168 "'continue' (may submit other steps that are not downstream from the error). Default is 'stop'.",
169 default="stop", choices=("stop", "continue"))
171 parser.add_argument("--enable-dev", action="store_true",
172 help="Enable loading and running development versions "
173 "of CWL spec.", default=False)
174 parser.add_argument('--storage-classes', default="default", type=str,
175 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
177 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
178 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
181 parser.add_argument("--priority", type=int,
182 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
183 default=DEFAULT_PRIORITY)
185 parser.add_argument("--disable-validate", dest="do_validate",
186 action="store_false", default=True,
187 help=argparse.SUPPRESS)
189 parser.add_argument("--disable-js-validation",
190 action="store_true", default=False,
191 help=argparse.SUPPRESS)
193 parser.add_argument("--thread-count", type=int,
194 default=4, help="Number of threads to use for job submit and output collection.")
196 parser.add_argument("--http-timeout", type=int,
197 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
199 exgroup = parser.add_mutually_exclusive_group()
200 exgroup.add_argument("--trash-intermediate", action="store_true",
201 default=False, dest="trash_intermediate",
202 help="Immediately trash intermediate outputs on workflow success.")
203 exgroup.add_argument("--no-trash-intermediate", action="store_false",
204 default=False, dest="trash_intermediate",
205 help="Do not trash intermediate outputs (default).")
207 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
208 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
213 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
214 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
215 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
216 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
218 cwltool.process.supportedProcessRequirements.extend([
219 "http://arvados.org/cwl#RunInSingleContainer",
220 "http://arvados.org/cwl#OutputDirType",
221 "http://arvados.org/cwl#RuntimeConstraints",
222 "http://arvados.org/cwl#PartitionRequirement",
223 "http://arvados.org/cwl#APIRequirement",
224 "http://commonwl.org/cwltool#LoadListingRequirement",
225 "http://arvados.org/cwl#IntermediateOutput",
226 "http://arvados.org/cwl#ReuseRequirement",
227 "http://arvados.org/cwl#ClusterTarget"
230 def exit_signal_handler(sigcode, frame):
231 logger.error("Caught signal {}, exiting.".format(sigcode))
234 def main(args, stdout, stderr, api_client=None, keep_client=None,
235 install_sig_handlers=True):
236 parser = arg_parser()
238 job_order_object = None
239 arvargs = parser.parse_args(args)
241 if len(arvargs.storage_classes.strip().split(',')) > 1:
242 logger.error("Multiple storage classes are not supported currently.")
245 arvargs.use_container = True
246 arvargs.relax_path_checks = True
247 arvargs.print_supported_versions = False
249 if install_sig_handlers:
250 arv_cmd.install_signal_handlers()
252 if arvargs.update_workflow:
253 if arvargs.update_workflow.find('-7fd4e-') == 5:
254 want_api = 'containers'
255 elif arvargs.update_workflow.find('-p5p6p-') == 5:
259 if want_api and arvargs.work_api and want_api != arvargs.work_api:
260 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
261 arvargs.update_workflow, want_api, arvargs.work_api))
263 arvargs.work_api = want_api
265 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
266 job_order_object = ({}, "")
270 for key, val in cwltool.argparser.get_default_args().items():
271 if not hasattr(arvargs, key):
272 setattr(arvargs, key, val)
275 if api_client is None:
276 api_client = arvados.safeapi.ThreadSafeApiCache(
277 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
278 keep_params={"num_retries": 4})
279 keep_client = api_client.keep
280 # Make an API object now so errors are reported early.
281 api_client.users().current().execute()
282 if keep_client is None:
283 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
284 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
285 except Exception as e:
290 logger.setLevel(logging.DEBUG)
291 logging.getLogger('arvados').setLevel(logging.DEBUG)
294 logger.setLevel(logging.WARN)
295 logging.getLogger('arvados').setLevel(logging.WARN)
296 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
299 metrics.setLevel(logging.DEBUG)
300 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
302 if arvargs.log_timestamps:
303 arvados.log_handler.setFormatter(logging.Formatter(
304 '%(asctime)s %(name)s %(levelname)s: %(message)s',
305 '%Y-%m-%d %H:%M:%S'))
307 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
309 return cwltool.main.main(args=arvargs,
312 executor=executor.arv_executor,
313 versionfunc=versionstring,
314 job_order_object=job_order_object,
315 logger_handler=arvados.log_handler,
316 custom_schema_callback=add_arv_hints,
317 loadingContext=executor.loadingContext,
318 runtimeContext=executor.runtimeContext)