2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
14 import pkg_resources # part of setuptools
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
55 """Print version string of key packages for provenance and debugging."""
57 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58 arvpkg = pkg_resources.require("arvados-python-client")
59 cwlpkg = pkg_resources.require("cwltool")
61 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62 "arvados-python-client", arvpkg[0].version,
63 "cwltool", cwlpkg[0].version)
66 def arg_parser(): # type: () -> argparse.ArgumentParser
67 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
69 parser.add_argument("--basedir", type=str,
70 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72 help="Output directory, default current directory")
74 parser.add_argument("--eval-timeout",
75 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79 exgroup = parser.add_mutually_exclusive_group()
80 exgroup.add_argument("--print-dot", action="store_true",
81 help="Print workflow visualization in graphviz format and exit")
82 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
85 exgroup = parser.add_mutually_exclusive_group()
86 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
90 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
92 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
94 exgroup = parser.add_mutually_exclusive_group()
95 exgroup.add_argument("--enable-reuse", action="store_true",
96 default=True, dest="enable_reuse",
97 help="Enable job or container reuse (default)")
98 exgroup.add_argument("--disable-reuse", action="store_false",
99 default=True, dest="enable_reuse",
100 help="Disable job or container reuse")
102 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106 help="Ignore Docker image version when deciding whether to reuse past jobs.",
109 exgroup = parser.add_mutually_exclusive_group()
110 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111 default=True, dest="submit")
112 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113 default=True, dest="submit")
114 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115 dest="create_workflow")
116 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
119 exgroup = parser.add_mutually_exclusive_group()
120 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121 default=True, dest="wait")
122 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123 default=True, dest="wait")
125 exgroup = parser.add_mutually_exclusive_group()
126 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127 default=True, dest="log_timestamps")
128 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129 default=True, dest="log_timestamps")
131 parser.add_argument("--api", type=str,
132 default=None, dest="work_api",
133 choices=("jobs", "containers"),
134 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
136 parser.add_argument("--compute-checksum", action="store_true", default=False,
137 help="Compute checksum of contents while collecting outputs",
138 dest="compute_checksum")
140 parser.add_argument("--submit-runner-ram", type=int,
141 help="RAM (in MiB) required for the workflow runner job (default 1024)",
144 parser.add_argument("--submit-runner-image", type=str,
145 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148 parser.add_argument("--always-submit-runner", action="store_true",
149 help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
152 exgroup = parser.add_mutually_exclusive_group()
153 exgroup.add_argument("--submit-request-uuid", type=str,
155 help="Update and commit to supplied container request instead of creating a new one (containers API only).",
157 exgroup.add_argument("--submit-runner-cluster", type=str,
158 help="Submit workflow runner to a remote cluster (containers API only)",
160 metavar="CLUSTER_ID")
162 parser.add_argument("--collection-cache-size", type=int,
164 help="Collection cache size (in MiB, default 256).")
166 parser.add_argument("--name", type=str,
167 help="Name to use for workflow execution instance.",
170 parser.add_argument("--on-error",
171 help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
172 "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
173 default="continue", choices=("stop", "continue"))
175 parser.add_argument("--enable-dev", action="store_true",
176 help="Enable loading and running development versions "
177 "of CWL spec.", default=False)
178 parser.add_argument('--storage-classes', default="default", type=str,
179 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
181 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
182 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
185 parser.add_argument("--priority", type=int,
186 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
187 default=DEFAULT_PRIORITY)
189 parser.add_argument("--disable-validate", dest="do_validate",
190 action="store_false", default=True,
191 help=argparse.SUPPRESS)
193 parser.add_argument("--disable-js-validation",
194 action="store_true", default=False,
195 help=argparse.SUPPRESS)
197 parser.add_argument("--thread-count", type=int,
198 default=4, help="Number of threads to use for job submit and output collection.")
200 parser.add_argument("--http-timeout", type=int,
201 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
203 exgroup = parser.add_mutually_exclusive_group()
204 exgroup.add_argument("--trash-intermediate", action="store_true",
205 default=False, dest="trash_intermediate",
206 help="Immediately trash intermediate outputs on workflow success.")
207 exgroup.add_argument("--no-trash-intermediate", action="store_false",
208 default=False, dest="trash_intermediate",
209 help="Do not trash intermediate outputs (default).")
211 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
212 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
217 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
218 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
219 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
220 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
222 cwltool.process.supportedProcessRequirements.extend([
223 "http://arvados.org/cwl#RunInSingleContainer",
224 "http://arvados.org/cwl#OutputDirType",
225 "http://arvados.org/cwl#RuntimeConstraints",
226 "http://arvados.org/cwl#PartitionRequirement",
227 "http://arvados.org/cwl#APIRequirement",
228 "http://commonwl.org/cwltool#LoadListingRequirement",
229 "http://arvados.org/cwl#IntermediateOutput",
230 "http://arvados.org/cwl#ReuseRequirement",
231 "http://arvados.org/cwl#ClusterTarget"
234 def exit_signal_handler(sigcode, frame):
235 logger.error("Caught signal {}, exiting.".format(sigcode))
238 def main(args, stdout, stderr, api_client=None, keep_client=None,
239 install_sig_handlers=True):
240 parser = arg_parser()
242 job_order_object = None
243 arvargs = parser.parse_args(args)
245 if len(arvargs.storage_classes.strip().split(',')) > 1:
246 logger.error("Multiple storage classes are not supported currently.")
249 arvargs.use_container = True
250 arvargs.relax_path_checks = True
251 arvargs.print_supported_versions = False
253 if install_sig_handlers:
254 arv_cmd.install_signal_handlers()
256 if arvargs.update_workflow:
257 if arvargs.update_workflow.find('-7fd4e-') == 5:
258 want_api = 'containers'
259 elif arvargs.update_workflow.find('-p5p6p-') == 5:
263 if want_api and arvargs.work_api and want_api != arvargs.work_api:
264 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
265 arvargs.update_workflow, want_api, arvargs.work_api))
267 arvargs.work_api = want_api
269 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
270 job_order_object = ({}, "")
274 for key, val in cwltool.argparser.get_default_args().items():
275 if not hasattr(arvargs, key):
276 setattr(arvargs, key, val)
279 if api_client is None:
280 api_client = arvados.safeapi.ThreadSafeApiCache(
281 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
282 keep_params={"num_retries": 4})
283 keep_client = api_client.keep
284 # Make an API object now so errors are reported early.
285 api_client.users().current().execute()
286 if keep_client is None:
287 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
288 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
289 except Exception as e:
294 logger.setLevel(logging.DEBUG)
295 logging.getLogger('arvados').setLevel(logging.DEBUG)
298 logger.setLevel(logging.WARN)
299 logging.getLogger('arvados').setLevel(logging.WARN)
300 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
303 metrics.setLevel(logging.DEBUG)
304 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
306 if arvargs.log_timestamps:
307 arvados.log_handler.setFormatter(logging.Formatter(
308 '%(asctime)s %(name)s %(levelname)s: %(message)s',
309 '%Y-%m-%d %H:%M:%S'))
311 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
313 return cwltool.main.main(args=arvargs,
316 executor=executor.arv_executor,
317 versionfunc=versionstring,
318 job_order_object=job_order_object,
319 logger_handler=arvados.log_handler,
320 custom_schema_callback=add_arv_hints,
321 loadingContext=executor.loadingContext,
322 runtimeContext=executor.runtimeContext)