closes #10215
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.errors import ApiError
25
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
35
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
41
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
45
46
47 class ArvCwlRunner(object):
48     """Execute a CWL tool or workflow, submit work (using either jobs or
49     containers API), wait for them to complete, and report output.
50
51     """
52
53     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
54         self.api = api_client
55         self.processes = {}
56         self.lock = threading.Lock()
57         self.cond = threading.Condition(self.lock)
58         self.final_output = None
59         self.final_status = None
60         self.uploaded = {}
61         self.num_retries = 4
62         self.uuid = None
63         self.stop_polling = threading.Event()
64         self.poll_api = None
65         self.pipeline = None
66         self.final_output_collection = None
67         self.output_name = output_name
68         self.output_tags = output_tags
69         self.project_uuid = None
70
71         if keep_client is not None:
72             self.keep_client = keep_client
73         else:
74             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
75
76         self.work_api = None
77         expected_api = ["jobs", "containers"]
78         for api in expected_api:
79             try:
80                 methods = self.api._rootDesc.get('resources')[api]['methods']
81                 if ('httpMethod' in methods['create'] and
82                     (work_api == api or work_api is None)):
83                     self.work_api = api
84                     break
85             except KeyError:
86                 pass
87
88         if not self.work_api:
89             if work_api is None:
90                 raise Exception("No supported APIs")
91             else:
92                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
93
94     def arv_make_tool(self, toolpath_object, **kwargs):
95         kwargs["work_api"] = self.work_api
96         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
97             return ArvadosCommandTool(self, toolpath_object, **kwargs)
98         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
99             return ArvadosWorkflow(self, toolpath_object, **kwargs)
100         else:
101             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
102
103     def output_callback(self, out, processStatus):
104         if processStatus == "success":
105             logger.info("Overall process status is %s", processStatus)
106             if self.pipeline:
107                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
108                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
109         else:
110             logger.warn("Overall process status is %s", processStatus)
111             if self.pipeline:
112                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
114         self.final_status = processStatus
115         self.final_output = out
116
117     def on_message(self, event):
118         if "object_uuid" in event:
119             if event["object_uuid"] in self.processes and event["event_type"] == "update":
120                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
121                     uuid = event["object_uuid"]
122                     with self.lock:
123                         j = self.processes[uuid]
124                         logger.info("Job %s (%s) is Running", j.name, uuid)
125                         j.running = True
126                         j.update_pipeline_component(event["properties"]["new_attributes"])
127                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
128                     uuid = event["object_uuid"]
129                     try:
130                         self.cond.acquire()
131                         j = self.processes[uuid]
132                         txt = self.work_api[0].upper() + self.work_api[1:-1]
133                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
134                         with Perf(metrics, "done %s" % j.name):
135                             j.done(event["properties"]["new_attributes"])
136                         self.cond.notify()
137                     finally:
138                         self.cond.release()
139
140     def poll_states(self):
141         """Poll status of jobs or containers listed in the processes dict.
142
143         Runs in a separate thread.
144         """
145
146         try:
147             while True:
148                 self.stop_polling.wait(15)
149                 if self.stop_polling.is_set():
150                     break
151                 with self.lock:
152                     keys = self.processes.keys()
153                 if not keys:
154                     continue
155
156                 if self.work_api == "containers":
157                     table = self.poll_api.container_requests()
158                 elif self.work_api == "jobs":
159                     table = self.poll_api.jobs()
160
161                 try:
162                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
163                 except Exception as e:
164                     logger.warn("Error checking states on API server: %s", e)
165                     continue
166
167                 for p in proc_states["items"]:
168                     self.on_message({
169                         "object_uuid": p["uuid"],
170                         "event_type": "update",
171                         "properties": {
172                             "new_attributes": p
173                         }
174                     })
175         except:
176             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
177             self.cond.acquire()
178             self.processes.clear()
179             self.cond.notify()
180             self.cond.release()
181         finally:
182             self.stop_polling.set()
183
184     def get_uploaded(self):
185         return self.uploaded.copy()
186
187     def add_uploaded(self, src, pair):
188         self.uploaded[src] = pair
189
190     def check_writable(self, obj):
191         if isinstance(obj, dict):
192             if obj.get("writable"):
193                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
194             for v in obj.itervalues():
195                 self.check_writable(v)
196         if isinstance(obj, list):
197             for v in obj:
198                 self.check_writable(v)
199
200     def make_output_collection(self, name, tagsString, outputObj):
201         outputObj = copy.deepcopy(outputObj)
202
203         files = []
204         def capture(fileobj):
205             files.append(fileobj)
206
207         adjustDirObjs(outputObj, capture)
208         adjustFileObjs(outputObj, capture)
209
210         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
211
212         final = arvados.collection.Collection(api_client=self.api,
213                                               keep_client=self.keep_client,
214                                               num_retries=self.num_retries)
215
216         srccollections = {}
217         for k,v in generatemapper.items():
218             if k.startswith("_:"):
219                 if v.type == "Directory":
220                     continue
221                 if v.type == "CreateFile":
222                     with final.open(v.target, "wb") as f:
223                         f.write(v.resolved.encode("utf-8"))
224                     continue
225
226             if not k.startswith("keep:"):
227                 raise Exception("Output source is not in keep or a literal")
228             sp = k.split("/")
229             srccollection = sp[0][5:]
230             if srccollection not in srccollections:
231                 try:
232                     srccollections[srccollection] = arvados.collection.CollectionReader(
233                         srccollection,
234                         api_client=self.api,
235                         keep_client=self.keep_client,
236                         num_retries=self.num_retries)
237                 except arvados.errors.ArgumentError as e:
238                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
239                     raise
240             reader = srccollections[srccollection]
241             try:
242                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
243                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
244             except IOError as e:
245                 logger.warn("While preparing output collection: %s", e)
246
247         def rewrite(fileobj):
248             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
249             for k in ("basename", "listing", "contents"):
250                 if k in fileobj:
251                     del fileobj[k]
252
253         adjustDirObjs(outputObj, rewrite)
254         adjustFileObjs(outputObj, rewrite)
255
256         with final.open("cwl.output.json", "w") as f:
257             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
258
259         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
260
261         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
262                     final.api_response()["name"],
263                     final.manifest_locator())
264
265         final_uuid = final.manifest_locator()
266         tags = tagsString.split(',')
267         for tag in tags:
268              self.api.links().create(body={
269                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
270                 }).execute(num_retries=self.num_retries)
271
272         def finalcollection(fileobj):
273             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
274
275         adjustDirObjs(outputObj, finalcollection)
276         adjustFileObjs(outputObj, finalcollection)
277
278         return (outputObj, final)
279
280     def set_crunch_output(self):
281         if self.work_api == "containers":
282             try:
283                 current = self.api.containers().current().execute(num_retries=self.num_retries)
284             except ApiError as e:
285                 # Status code 404 just means we're not running in a container.
286                 if e.resp.status != 404:
287                     logger.info("Getting current container: %s", e)
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                              body={
292                                                  'output': self.final_output_collection.portable_data_hash(),
293                                              }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Setting container output: %s", e)
296         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
297             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
298                                    body={
299                                        'output': self.final_output_collection.portable_data_hash(),
300                                        'success': self.final_status == "success",
301                                        'progress':1.0
302                                    }).execute(num_retries=self.num_retries)
303
304     def arv_executor(self, tool, job_order, **kwargs):
305         self.debug = kwargs.get("debug")
306
307         tool.visit(self.check_writable)
308
309         self.project_uuid = kwargs.get("project_uuid")
310         self.pipeline = None
311         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
312                                                                  api_client=self.api,
313                                                                  keep_client=self.keep_client)
314         self.fs_access = make_fs_access(kwargs["basedir"])
315
316         existing_uuid = kwargs.get("update_workflow")
317         if existing_uuid or kwargs.get("create_workflow"):
318             if self.work_api == "jobs":
319                 tmpl = RunnerTemplate(self, tool, job_order,
320                                       kwargs.get("enable_reuse"),
321                                       uuid=existing_uuid,
322                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
323                 tmpl.save()
324                 # cwltool.main will write our return value to stdout.
325                 return tmpl.uuid
326             else:
327                 return upload_workflow(self, tool, job_order,
328                                        self.project_uuid,
329                                        uuid=existing_uuid,
330                                        submit_runner_ram=kwargs.get("submit_runner_ram"))
331
332         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
333
334         kwargs["make_fs_access"] = make_fs_access
335         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
336         kwargs["use_container"] = True
337         kwargs["tmpdir_prefix"] = "tmp"
338         kwargs["on_error"] = "continue"
339         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
340
341         if self.work_api == "containers":
342             kwargs["outdir"] = "/var/spool/cwl"
343             kwargs["docker_outdir"] = "/var/spool/cwl"
344             kwargs["tmpdir"] = "/tmp"
345             kwargs["docker_tmpdir"] = "/tmp"
346         elif self.work_api == "jobs":
347             kwargs["outdir"] = "$(task.outdir)"
348             kwargs["docker_outdir"] = "$(task.outdir)"
349             kwargs["tmpdir"] = "$(task.tmpdir)"
350
351         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
352
353         runnerjob = None
354         if kwargs.get("submit"):
355             if self.work_api == "containers":
356                 if tool.tool["class"] == "CommandLineTool":
357                     runnerjob = tool.job(job_order,
358                                          self.output_callback,
359                                          **kwargs).next()
360                 else:
361                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
362                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
363             else:
364                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
365                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
366
367         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
368             # Create pipeline for local run
369             self.pipeline = self.api.pipeline_instances().create(
370                 body={
371                     "owner_uuid": self.project_uuid,
372                     "name": shortname(tool.tool["id"]),
373                     "components": {},
374                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
375             logger.info("Pipeline instance %s", self.pipeline["uuid"])
376
377         if runnerjob and not kwargs.get("wait"):
378             runnerjob.run(wait=kwargs.get("wait"))
379             return runnerjob.uuid
380
381         self.poll_api = arvados.api('v1')
382         self.polling_thread = threading.Thread(target=self.poll_states)
383         self.polling_thread.start()
384
385         if runnerjob:
386             jobiter = iter((runnerjob,))
387         else:
388             if "cwl_runner_job" in kwargs:
389                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
390             jobiter = tool.job(job_order,
391                                self.output_callback,
392                                **kwargs)
393
394         try:
395             self.cond.acquire()
396             # Will continue to hold the lock for the duration of this code
397             # except when in cond.wait(), at which point on_message can update
398             # job state and process output callbacks.
399
400             loopperf = Perf(metrics, "jobiter")
401             loopperf.__enter__()
402             for runnable in jobiter:
403                 loopperf.__exit__()
404
405                 if self.stop_polling.is_set():
406                     break
407
408                 if runnable:
409                     with Perf(metrics, "run"):
410                         runnable.run(**kwargs)
411                 else:
412                     if self.processes:
413                         self.cond.wait(1)
414                     else:
415                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
416                         break
417                 loopperf.__enter__()
418             loopperf.__exit__()
419
420             while self.processes:
421                 self.cond.wait(1)
422
423         except UnsupportedRequirement:
424             raise
425         except:
426             if sys.exc_info()[0] is KeyboardInterrupt:
427                 logger.error("Interrupted, marking pipeline as failed")
428             else:
429                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
430             if self.pipeline:
431                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
432                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
433             if runnerjob and runnerjob.uuid and self.work_api == "containers":
434                 self.api.container_requests().update(uuid=runnerjob.uuid,
435                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
436         finally:
437             self.cond.release()
438             self.stop_polling.set()
439             self.polling_thread.join()
440
441         if self.final_status == "UnsupportedRequirement":
442             raise UnsupportedRequirement("Check log for details.")
443
444         if self.final_output is None:
445             raise WorkflowException("Workflow did not return a result.")
446
447         if kwargs.get("submit") and isinstance(runnerjob, Runner):
448             logger.info("Final output collection %s", runnerjob.final_output)
449         else:
450             if self.output_name is None:
451                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
452             if self.output_tags is None:
453                 self.output_tags = ""
454             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
455             self.set_crunch_output()
456
457         if self.final_status != "success":
458             raise WorkflowException("Workflow failed.")
459
460         if kwargs.get("compute_checksum"):
461             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
462             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
463
464         return self.final_output
465
466
467 def versionstring():
468     """Print version string of key packages for provenance and debugging."""
469
470     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
471     arvpkg = pkg_resources.require("arvados-python-client")
472     cwlpkg = pkg_resources.require("cwltool")
473
474     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
475                                     "arvados-python-client", arvpkg[0].version,
476                                     "cwltool", cwlpkg[0].version)
477
478
479 def arg_parser():  # type: () -> argparse.ArgumentParser
480     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
481
482     parser.add_argument("--basedir", type=str,
483                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
484     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
485                         help="Output directory, default current directory")
486
487     parser.add_argument("--eval-timeout",
488                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
489                         type=float,
490                         default=20)
491     parser.add_argument("--version", action="store_true", help="Print version and exit")
492
493     exgroup = parser.add_mutually_exclusive_group()
494     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
495     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
496     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
497
498     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
499
500     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
501
502     exgroup = parser.add_mutually_exclusive_group()
503     exgroup.add_argument("--enable-reuse", action="store_true",
504                         default=True, dest="enable_reuse",
505                         help="")
506     exgroup.add_argument("--disable-reuse", action="store_false",
507                         default=True, dest="enable_reuse",
508                         help="")
509
510     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
511     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
512     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
513     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
514                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
515                         default=False)
516
517     exgroup = parser.add_mutually_exclusive_group()
518     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
519                         default=True, dest="submit")
520     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
521                         default=True, dest="submit")
522     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
523                          dest="create_workflow")
524     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
525     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
526
527     exgroup = parser.add_mutually_exclusive_group()
528     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
529                         default=True, dest="wait")
530     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
531                         default=True, dest="wait")
532
533     parser.add_argument("--api", type=str,
534                         default=None, dest="work_api",
535                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
536
537     parser.add_argument("--compute-checksum", action="store_true", default=False,
538                         help="Compute checksum of contents while collecting outputs",
539                         dest="compute_checksum")
540
541     parser.add_argument("--submit-runner-ram", type=int,
542                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
543                         default=1024)
544
545     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
546     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
547
548     return parser
549
550 def add_arv_hints():
551     cache = {}
552     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
553     cache["http://arvados.org/cwl"] = res.read()
554     res.close()
555     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
556     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
557     for n in extnames.names:
558         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
559             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
560         document_loader.idx["http://arvados.org/cwl#"+n] = {}
561
562 def main(args, stdout, stderr, api_client=None, keep_client=None):
563     parser = arg_parser()
564
565     job_order_object = None
566     arvargs = parser.parse_args(args)
567
568     if arvargs.version:
569         print versionstring()
570         return
571
572     if arvargs.update_workflow:
573         if arvargs.update_workflow.find('-7fd4e-') == 5:
574             want_api = 'containers'
575         elif arvargs.update_workflow.find('-p5p6p-') == 5:
576             want_api = 'jobs'
577         else:
578             want_api = None
579         if want_api and arvargs.work_api and want_api != arvargs.work_api:
580             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
581                 arvargs.update_workflow, want_api, arvargs.work_api))
582             return 1
583         arvargs.work_api = want_api
584
585     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
586         job_order_object = ({}, "")
587
588     add_arv_hints()
589
590     try:
591         if api_client is None:
592             api_client=arvados.api('v1', model=OrderedJsonModel())
593         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
594     except Exception as e:
595         logger.error(e)
596         return 1
597
598     if arvargs.debug:
599         logger.setLevel(logging.DEBUG)
600
601     if arvargs.quiet:
602         logger.setLevel(logging.WARN)
603         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
604
605     if arvargs.metrics:
606         metrics.setLevel(logging.DEBUG)
607         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
608
609     arvargs.conformance_test = None
610     arvargs.use_container = True
611     arvargs.relax_path_checks = True
612
613     return cwltool.main.main(args=arvargs,
614                              stdout=stdout,
615                              stderr=stderr,
616                              executor=runner.arv_executor,
617                              makeTool=runner.arv_make_tool,
618                              versionfunc=versionstring,
619                              job_order_object=job_order_object,
620                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))