2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
14 import pkg_resources # part of setuptools
16 from schema_salad.sourceline import SourceLine
17 import schema_salad.validate as validate
19 import cwltool.workflow
20 import cwltool.process
21 import cwltool.argparser
22 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29 import arvados.commands._util as arv_cmd
30 from arvados.api import OrderedJsonModel
32 from .perf import Perf
33 from ._version import __version__
34 from .executor import ArvCwlExecutor
36 # These arn't used directly in this file but
37 # other code expects to import them from here
38 from .arvcontainer import ArvadosContainer
39 from .arvjob import ArvadosJob
40 from .arvtool import ArvadosCommandTool
41 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
42 from .util import get_current_container
43 from .executor import RuntimeStatusLoggingHandler, DEFAULT_PRIORITY
44 from .arvworkflow import ArvadosWorkflow
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
55 """Print version string of key packages for provenance and debugging."""
57 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
58 arvpkg = pkg_resources.require("arvados-python-client")
59 cwlpkg = pkg_resources.require("cwltool")
61 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
62 "arvados-python-client", arvpkg[0].version,
63 "cwltool", cwlpkg[0].version)
66 def arg_parser(): # type: () -> argparse.ArgumentParser
67 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
69 parser.add_argument("--basedir", type=str,
70 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
71 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
72 help="Output directory, default current directory")
74 parser.add_argument("--eval-timeout",
75 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
79 exgroup = parser.add_mutually_exclusive_group()
80 exgroup.add_argument("--print-dot", action="store_true",
81 help="Print workflow visualization in graphviz format and exit")
82 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
83 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
85 exgroup = parser.add_mutually_exclusive_group()
86 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
87 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
88 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
90 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
92 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
94 exgroup = parser.add_mutually_exclusive_group()
95 exgroup.add_argument("--enable-reuse", action="store_true",
96 default=True, dest="enable_reuse",
97 help="Enable job or container reuse (default)")
98 exgroup.add_argument("--disable-reuse", action="store_false",
99 default=True, dest="enable_reuse",
100 help="Disable job or container reuse")
102 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
103 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
104 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
105 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
106 help="Ignore Docker image version when deciding whether to reuse past jobs.",
109 exgroup = parser.add_mutually_exclusive_group()
110 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
111 default=True, dest="submit")
112 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
113 default=True, dest="submit")
114 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
115 dest="create_workflow")
116 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
117 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
119 exgroup = parser.add_mutually_exclusive_group()
120 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
121 default=True, dest="wait")
122 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
123 default=True, dest="wait")
125 exgroup = parser.add_mutually_exclusive_group()
126 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
127 default=True, dest="log_timestamps")
128 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
129 default=True, dest="log_timestamps")
131 parser.add_argument("--api", type=str,
132 default=None, dest="work_api",
133 choices=("jobs", "containers"),
134 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
136 parser.add_argument("--compute-checksum", action="store_true", default=False,
137 help="Compute checksum of contents while collecting outputs",
138 dest="compute_checksum")
140 parser.add_argument("--submit-runner-ram", type=int,
141 help="RAM (in MiB) required for the workflow runner job (default 1024)",
144 parser.add_argument("--submit-runner-image", type=str,
145 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
148 exgroup = parser.add_mutually_exclusive_group()
149 exgroup.add_argument("--submit-request-uuid", type=str,
151 help="Update and commit to supplied container request instead of creating a new one (containers API only).")
152 exgroup.add_argument("--submit-runner-cluster", type=str,
153 help="Submit toplevel runner to a remote cluster (containers API only)",
156 parser.add_argument("--name", type=str,
157 help="Name to use for workflow execution instance.",
160 parser.add_argument("--on-error", type=str,
161 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
162 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
164 parser.add_argument("--enable-dev", action="store_true",
165 help="Enable loading and running development versions "
166 "of CWL spec.", default=False)
167 parser.add_argument('--storage-classes', default="default", type=str,
168 help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
170 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
171 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
174 parser.add_argument("--priority", type=int,
175 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
176 default=DEFAULT_PRIORITY)
178 parser.add_argument("--disable-validate", dest="do_validate",
179 action="store_false", default=True,
180 help=argparse.SUPPRESS)
182 parser.add_argument("--disable-js-validation",
183 action="store_true", default=False,
184 help=argparse.SUPPRESS)
186 parser.add_argument("--thread-count", type=int,
187 default=4, help="Number of threads to use for job submit and output collection.")
189 parser.add_argument("--http-timeout", type=int,
190 default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
192 exgroup = parser.add_mutually_exclusive_group()
193 exgroup.add_argument("--trash-intermediate", action="store_true",
194 default=False, dest="trash_intermediate",
195 help="Immediately trash intermediate outputs on workflow success.")
196 exgroup.add_argument("--no-trash-intermediate", action="store_false",
197 default=False, dest="trash_intermediate",
198 help="Do not trash intermediate outputs (default).")
200 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
201 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
206 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
207 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
208 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
209 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
211 cwltool.process.supportedProcessRequirements.extend([
212 "http://arvados.org/cwl#RunInSingleContainer",
213 "http://arvados.org/cwl#OutputDirType",
214 "http://arvados.org/cwl#RuntimeConstraints",
215 "http://arvados.org/cwl#PartitionRequirement",
216 "http://arvados.org/cwl#APIRequirement",
217 "http://commonwl.org/cwltool#LoadListingRequirement",
218 "http://arvados.org/cwl#IntermediateOutput",
219 "http://arvados.org/cwl#ReuseRequirement",
220 "http://arvados.org/cwl#ClusterTarget"
223 def exit_signal_handler(sigcode, frame):
224 logger.error("Caught signal {}, exiting.".format(sigcode))
227 def main(args, stdout, stderr, api_client=None, keep_client=None,
228 install_sig_handlers=True):
229 parser = arg_parser()
231 job_order_object = None
232 arvargs = parser.parse_args(args)
234 if len(arvargs.storage_classes.strip().split(',')) > 1:
235 logger.error("Multiple storage classes are not supported currently.")
238 arvargs.use_container = True
239 arvargs.relax_path_checks = True
240 arvargs.print_supported_versions = False
242 if install_sig_handlers:
243 arv_cmd.install_signal_handlers()
245 if arvargs.update_workflow:
246 if arvargs.update_workflow.find('-7fd4e-') == 5:
247 want_api = 'containers'
248 elif arvargs.update_workflow.find('-p5p6p-') == 5:
252 if want_api and arvargs.work_api and want_api != arvargs.work_api:
253 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
254 arvargs.update_workflow, want_api, arvargs.work_api))
256 arvargs.work_api = want_api
258 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
259 job_order_object = ({}, "")
263 for key, val in cwltool.argparser.get_default_args().items():
264 if not hasattr(arvargs, key):
265 setattr(arvargs, key, val)
268 if api_client is None:
269 api_client = arvados.safeapi.ThreadSafeApiCache(
270 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
271 keep_params={"num_retries": 4})
272 keep_client = api_client.keep
273 # Make an API object now so errors are reported early.
274 api_client.users().current().execute()
275 if keep_client is None:
276 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
277 executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
278 except Exception as e:
283 logger.setLevel(logging.DEBUG)
284 logging.getLogger('arvados').setLevel(logging.DEBUG)
287 logger.setLevel(logging.WARN)
288 logging.getLogger('arvados').setLevel(logging.WARN)
289 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
292 metrics.setLevel(logging.DEBUG)
293 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
295 if arvargs.log_timestamps:
296 arvados.log_handler.setFormatter(logging.Formatter(
297 '%(asctime)s %(name)s %(levelname)s: %(message)s',
298 '%Y-%m-%d %H:%M:%S'))
300 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
302 return cwltool.main.main(args=arvargs,
305 executor=executor.arv_executor,
306 versionfunc=versionstring,
307 job_order_object=job_order_object,
308 logger_handler=arvados.log_handler,
309 custom_schema_callback=add_arv_hints,
310 loadingContext=executor.loadingContext,
311 runtimeContext=executor.runtimeContext)