2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
9 from builtins import str
16 import pkg_resources # part of setuptools
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
21 import cwltool.workflow
22 import cwltool.process
23 import cwltool.argparser
24 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
25 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31 import arvados.commands._util as arv_cmd
32 from arvados.api import OrderedJsonModel
34 from .perf import Perf
35 from ._version import __version__
36 from .executor import ArvCwlExecutor
38 # These arn't used directly in this file but
39 # other code expects to import them from here
40 from .arvcontainer import ArvadosContainer
41 from .arvjob import ArvadosJob
42 from .arvtool import ArvadosCommandTool
43 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
44 from .util import get_current_container
45 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
46 from .arvworkflow import ArvadosWorkflow
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50 logger.setLevel(logging.INFO)
52 arvados.log_handler.setFormatter(logging.Formatter(
53 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 """Print version string of key packages for provenance and debugging."""
59 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
60 arvpkg = pkg_resources.require("arvados-python-client")
61 cwlpkg = pkg_resources.require("cwltool")
63 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
64 "arvados-python-client", arvpkg[0].version,
65 "cwltool", cwlpkg[0].version)
68 def arg_parser(): # type: () -> argparse.ArgumentParser
69 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
71 parser.add_argument("--basedir", type=str,
72 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
73 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
74 help="Output directory, default current directory")
76 parser.add_argument("--eval-timeout",
77 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
81 exgroup = parser.add_mutually_exclusive_group()
82 exgroup.add_argument("--print-dot", action="store_true",
83 help="Print workflow visualization in graphviz format and exit")
84 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
85 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
87 exgroup = parser.add_mutually_exclusive_group()
88 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
89 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
90 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
92 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
94 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
96 exgroup = parser.add_mutually_exclusive_group()
97 exgroup.add_argument("--enable-reuse", action="store_true",
98 default=True, dest="enable_reuse",
99 help="Enable job or container reuse (default)")
100 exgroup.add_argument("--disable-reuse", action="store_false",
101 default=True, dest="enable_reuse",
102 help="Disable job or container reuse")
104 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
105 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
106 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
107 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
108 help="Ignore Docker image version when deciding whether to reuse past jobs.",
111 exgroup = parser.add_mutually_exclusive_group()
112 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
113 default=True, dest="submit")
114 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
115 default=True, dest="submit")
116 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
117 dest="create_workflow")
118 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
119 exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
121 exgroup = parser.add_mutually_exclusive_group()
122 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
123 default=True, dest="wait")
124 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
125 default=True, dest="wait")
127 exgroup = parser.add_mutually_exclusive_group()
128 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
129 default=True, dest="log_timestamps")
130 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
131 default=True, dest="log_timestamps")
133 parser.add_argument("--api", type=str,
134 default=None, dest="work_api",
135 choices=("jobs", "containers"),
136 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
138 parser.add_argument("--compute-checksum", action="store_true", default=False,
139 help="Compute checksum of contents while collecting outputs",
140 dest="compute_checksum")
142 parser.add_argument("--submit-runner-ram", type=int,
143 help="RAM (in MiB) required for the workflow runner job (default 1024)",
146 parser.add_argument("--submit-runner-image", type=str,
147 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
150 parser.add_argument("--always-submit-runner", action="store_true",
151 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
154 exgroup = parser.add_mutually_exclusive_group()
155 exgroup.add_argument("--submit-request-uuid", type=str,
157 help="Update and commit to supplied container request instead of creating a new one (containers API only).",
159 exgroup.add_argument("--submit-runner-cluster", type=str,
160 help="Submit workflow runner to a remote cluster (containers API only)",
162 metavar="CLUSTER_ID")
164 parser.add_argument("--collection-cache-size", type=int,
166 help="Collection cache size (in MiB, default 256).")
168 parser.add_argument("--name", type=str,
169 help="Name to use for workflow execution instance.",
172 parser.add_argument("--on-error",
173 help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
174 "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
175 default="continue", choices=("stop", "continue"))
177 parser.add_argument("--enable-dev", action="store_true",
178 help="Enable loading and running development versions "
179 "of CWL spec.", default=False)
180 parser.add_argument('--storage-classes', default="default", type=str,
181 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
183 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
184 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
187 parser.add_argument("--priority", type=int,
188 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
189 default=DEFAULT_PRIORITY)
191 parser.add_argument("--disable-validate", dest="do_validate",
192 action="store_false", default=True,
193 help=argparse.SUPPRESS)
195 parser.add_argument("--disable-js-validation",
196 action="store_true", default=False,
197 help=argparse.SUPPRESS)
199 parser.add_argument("--thread-count", type=int,
200 default=1, help="Number of threads to use for job submit and output collection.")
202 parser.add_argument("--http-timeout", type=int,
203 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
205 exgroup = parser.add_mutually_exclusive_group()
206 exgroup.add_argument("--trash-intermediate", action="store_true",
207 default=False, dest="trash_intermediate",
208 help="Immediately trash intermediate outputs on workflow success.")
209 exgroup.add_argument("--no-trash-intermediate", action="store_false",
210 default=False, dest="trash_intermediate",
211 help="Do not trash intermediate outputs (default).")
213 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
214 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
219 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
220 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
221 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
222 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
224 cwltool.process.supportedProcessRequirements.extend([
225 "http://arvados.org/cwl#RunInSingleContainer",
226 "http://arvados.org/cwl#OutputDirType",
227 "http://arvados.org/cwl#RuntimeConstraints",
228 "http://arvados.org/cwl#PartitionRequirement",
229 "http://arvados.org/cwl#APIRequirement",
230 "http://commonwl.org/cwltool#LoadListingRequirement",
231 "http://arvados.org/cwl#IntermediateOutput",
232 "http://arvados.org/cwl#ReuseRequirement",
233 "http://arvados.org/cwl#ClusterTarget"
236 def exit_signal_handler(sigcode, frame):
237 logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
240 def main(args, stdout, stderr, api_client=None, keep_client=None,
241 install_sig_handlers=True):
242 parser = arg_parser()
244 job_order_object = None
245 arvargs = parser.parse_args(args)
247 if len(arvargs.storage_classes.strip().split(',')) > 1:
248 logger.error(str(u"Multiple storage classes are not supported currently."))
251 arvargs.use_container = True
252 arvargs.relax_path_checks = True
253 arvargs.print_supported_versions = False
255 if install_sig_handlers:
256 arv_cmd.install_signal_handlers()
258 if arvargs.update_workflow:
259 if arvargs.update_workflow.find('-7fd4e-') == 5:
260 want_api = 'containers'
261 elif arvargs.update_workflow.find('-p5p6p-') == 5:
265 if want_api and arvargs.work_api and want_api != arvargs.work_api:
266 logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
267 arvargs.update_workflow, want_api, arvargs.work_api))
269 arvargs.work_api = want_api
271 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
272 job_order_object = ({}, "")
276 for key, val in list(cwltool.argparser.get_default_args().items()):
277 if not hasattr(arvargs, key):
278 setattr(arvargs, key, val)
281 if api_client is None:
282 api_client = arvados.safeapi.ThreadSafeApiCache(
283 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
284 keep_params={"num_retries": 4})
285 keep_client = api_client.keep
286 # Make an API object now so errors are reported early.
287 api_client.users().current().execute()
288 if keep_client is None:
289 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
290 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
291 except Exception as e:
296 logger.setLevel(logging.DEBUG)
297 logging.getLogger('arvados').setLevel(logging.DEBUG)
300 logger.setLevel(logging.WARN)
301 logging.getLogger('arvados').setLevel(logging.WARN)
302 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
305 metrics.setLevel(logging.DEBUG)
306 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
308 if arvargs.log_timestamps:
309 arvados.log_handler.setFormatter(logging.Formatter(
310 '%(asctime)s %(name)s %(levelname)s: %(message)s',
311 '%Y-%m-%d %H:%M:%S'))
313 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
315 return cwltool.main.main(args=arvargs,
318 executor=executor.arv_executor,
319 versionfunc=versionstring,
320 job_order_object=job_order_object,
321 logger_handler=arvados.log_handler,
322 custom_schema_callback=add_arv_hints,
323 loadingContext=executor.loadingContext,
324 runtimeContext=executor.runtimeContext)