3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, getListing
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
84 expected_api = ["jobs", "containers"]
85 for api in expected_api:
87 methods = self.api._rootDesc.get('resources')[api]['methods']
88 if ('httpMethod' in methods['create'] and
89 (work_api == api or work_api is None)):
97 raise Exception("No supported APIs")
99 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
101 def arv_make_tool(self, toolpath_object, **kwargs):
102 kwargs["work_api"] = self.work_api
103 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
105 keep_client=self.keep_client)
106 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
107 return ArvadosCommandTool(self, toolpath_object, **kwargs)
108 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
109 return ArvadosWorkflow(self, toolpath_object, **kwargs)
111 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
113 def output_callback(self, out, processStatus):
114 if processStatus == "success":
115 logger.info("Overall process status is %s", processStatus)
117 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118 body={"state": "Complete"}).execute(num_retries=self.num_retries)
120 logger.warn("Overall process status is %s", processStatus)
122 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123 body={"state": "Failed"}).execute(num_retries=self.num_retries)
124 self.final_status = processStatus
125 self.final_output = out
127 def on_message(self, event):
128 if "object_uuid" in event:
129 if event["object_uuid"] in self.processes and event["event_type"] == "update":
130 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
131 uuid = event["object_uuid"]
133 j = self.processes[uuid]
134 logger.info("%s %s is Running", self.label(j), uuid)
136 j.update_pipeline_component(event["properties"]["new_attributes"])
137 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
138 uuid = event["object_uuid"]
141 j = self.processes[uuid]
142 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
143 with Perf(metrics, "done %s" % j.name):
144 j.done(event["properties"]["new_attributes"])
149 def label(self, obj):
150 return "[%s %s]" % (self.work_api[0:-1], obj.name)
152 def poll_states(self):
153 """Poll status of jobs or containers listed in the processes dict.
155 Runs in a separate thread.
160 self.stop_polling.wait(15)
161 if self.stop_polling.is_set():
164 keys = self.processes.keys()
168 if self.work_api == "containers":
169 table = self.poll_api.container_requests()
170 elif self.work_api == "jobs":
171 table = self.poll_api.jobs()
174 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
175 except Exception as e:
176 logger.warn("Error checking states on API server: %s", e)
179 for p in proc_states["items"]:
181 "object_uuid": p["uuid"],
182 "event_type": "update",
188 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
190 self.processes.clear()
194 self.stop_polling.set()
196 def get_uploaded(self):
197 return self.uploaded.copy()
199 def add_uploaded(self, src, pair):
200 self.uploaded[src] = pair
202 def check_features(self, obj):
203 if isinstance(obj, dict):
204 if obj.get("writable"):
205 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
206 if obj.get("class") == "DockerRequirement":
207 if obj.get("dockerOutputDirectory"):
208 # TODO: can be supported by containers API, but not jobs API.
209 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
210 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
211 for v in obj.itervalues():
212 self.check_features(v)
213 elif isinstance(obj, list):
214 for i,v in enumerate(obj):
215 with SourceLine(obj, i, UnsupportedRequirement):
216 self.check_features(v)
218 def make_output_collection(self, name, tagsString, outputObj):
219 outputObj = copy.deepcopy(outputObj)
222 def capture(fileobj):
223 files.append(fileobj)
225 adjustDirObjs(outputObj, capture)
226 adjustFileObjs(outputObj, capture)
228 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
230 final = arvados.collection.Collection(api_client=self.api,
231 keep_client=self.keep_client,
232 num_retries=self.num_retries)
235 for k,v in generatemapper.items():
236 if k.startswith("_:"):
237 if v.type == "Directory":
239 if v.type == "CreateFile":
240 with final.open(v.target, "wb") as f:
241 f.write(v.resolved.encode("utf-8"))
244 if not k.startswith("keep:"):
245 raise Exception("Output source is not in keep or a literal")
247 srccollection = sp[0][5:]
248 if srccollection not in srccollections:
250 srccollections[srccollection] = arvados.collection.CollectionReader(
253 keep_client=self.keep_client,
254 num_retries=self.num_retries)
255 except arvados.errors.ArgumentError as e:
256 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
258 reader = srccollections[srccollection]
260 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
261 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
263 logger.warn("While preparing output collection: %s", e)
265 def rewrite(fileobj):
266 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
267 for k in ("basename", "listing", "contents"):
271 adjustDirObjs(outputObj, rewrite)
272 adjustFileObjs(outputObj, rewrite)
274 with final.open("cwl.output.json", "w") as f:
275 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
277 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
279 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
280 final.api_response()["name"],
281 final.manifest_locator())
283 final_uuid = final.manifest_locator()
284 tags = tagsString.split(',')
286 self.api.links().create(body={
287 "head_uuid": final_uuid, "link_class": "tag", "name": tag
288 }).execute(num_retries=self.num_retries)
290 def finalcollection(fileobj):
291 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
293 adjustDirObjs(outputObj, finalcollection)
294 adjustFileObjs(outputObj, finalcollection)
296 return (outputObj, final)
298 def set_crunch_output(self):
299 if self.work_api == "containers":
301 current = self.api.containers().current().execute(num_retries=self.num_retries)
302 except ApiError as e:
303 # Status code 404 just means we're not running in a container.
304 if e.resp.status != 404:
305 logger.info("Getting current container: %s", e)
308 self.api.containers().update(uuid=current['uuid'],
310 'output': self.final_output_collection.portable_data_hash(),
311 }).execute(num_retries=self.num_retries)
312 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
315 }).execute(num_retries=self.num_retries)
316 except Exception as e:
317 logger.info("Setting container output: %s", e)
318 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
319 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
321 'output': self.final_output_collection.portable_data_hash(),
322 'success': self.final_status == "success",
324 }).execute(num_retries=self.num_retries)
326 def arv_executor(self, tool, job_order, **kwargs):
327 self.debug = kwargs.get("debug")
329 tool.visit(self.check_features)
331 self.project_uuid = kwargs.get("project_uuid")
333 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
335 keep_client=self.keep_client)
336 self.fs_access = make_fs_access(kwargs["basedir"])
338 if not kwargs.get("name"):
339 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
341 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
342 # Also uploads docker images.
343 upload_workflow_deps(self, tool)
345 # Reload tool object which may have been updated by
346 # upload_workflow_deps
347 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
348 makeTool=self.arv_make_tool,
349 loader=tool.doc_loader,
350 avsc_names=tool.doc_schema,
351 metadata=tool.metadata)
353 # Upload local file references in the job order.
354 job_order = upload_job_order(self, "%s input" % kwargs["name"],
357 existing_uuid = kwargs.get("update_workflow")
358 if existing_uuid or kwargs.get("create_workflow"):
359 # Create a pipeline template or workflow record and exit.
360 if self.work_api == "jobs":
361 tmpl = RunnerTemplate(self, tool, job_order,
362 kwargs.get("enable_reuse"),
364 submit_runner_ram=kwargs.get("submit_runner_ram"),
367 # cwltool.main will write our return value to stdout.
368 return (tmpl.uuid, "success")
369 elif self.work_api == "containers":
370 return (upload_workflow(self, tool, job_order,
373 submit_runner_ram=kwargs.get("submit_runner_ram"),
374 name=kwargs["name"]),
377 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
379 kwargs["make_fs_access"] = make_fs_access
380 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
381 kwargs["use_container"] = True
382 kwargs["tmpdir_prefix"] = "tmp"
383 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
385 if self.work_api == "containers":
386 kwargs["outdir"] = "/var/spool/cwl"
387 kwargs["docker_outdir"] = "/var/spool/cwl"
388 kwargs["tmpdir"] = "/tmp"
389 kwargs["docker_tmpdir"] = "/tmp"
390 elif self.work_api == "jobs":
391 kwargs["outdir"] = "$(task.outdir)"
392 kwargs["docker_outdir"] = "$(task.outdir)"
393 kwargs["tmpdir"] = "$(task.tmpdir)"
396 if kwargs.get("submit"):
397 # Submit a runner job to run the workflow for us.
398 if self.work_api == "containers":
399 if tool.tool["class"] == "CommandLineTool":
400 kwargs["runnerjob"] = tool.tool["id"]
401 upload_dependencies(self,
407 runnerjob = tool.job(job_order,
408 self.output_callback,
411 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
414 submit_runner_ram=kwargs.get("submit_runner_ram"),
415 name=kwargs.get("name"),
416 on_error=kwargs.get("on_error"),
417 submit_runner_image=kwargs.get("submit_runner_image"))
418 elif self.work_api == "jobs":
419 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
422 submit_runner_ram=kwargs.get("submit_runner_ram"),
423 name=kwargs.get("name"),
424 on_error=kwargs.get("on_error"),
425 submit_runner_image=kwargs.get("submit_runner_image"))
427 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
428 # Create pipeline for local run
429 self.pipeline = self.api.pipeline_instances().create(
431 "owner_uuid": self.project_uuid,
432 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
434 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
435 logger.info("Pipeline instance %s", self.pipeline["uuid"])
437 if runnerjob and not kwargs.get("wait"):
438 runnerjob.run(wait=kwargs.get("wait"))
439 return (runnerjob.uuid, "success")
441 self.poll_api = arvados.api('v1')
442 self.polling_thread = threading.Thread(target=self.poll_states)
443 self.polling_thread.start()
446 jobiter = iter((runnerjob,))
448 if "cwl_runner_job" in kwargs:
449 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
450 jobiter = tool.job(job_order,
451 self.output_callback,
456 # Will continue to hold the lock for the duration of this code
457 # except when in cond.wait(), at which point on_message can update
458 # job state and process output callbacks.
460 loopperf = Perf(metrics, "jobiter")
462 for runnable in jobiter:
465 if self.stop_polling.is_set():
469 with Perf(metrics, "run"):
470 runnable.run(**kwargs)
475 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
480 while self.processes:
483 except UnsupportedRequirement:
486 if sys.exc_info()[0] is KeyboardInterrupt:
487 logger.error("Interrupted, marking pipeline as failed")
489 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
491 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
492 body={"state": "Failed"}).execute(num_retries=self.num_retries)
493 if runnerjob and runnerjob.uuid and self.work_api == "containers":
494 self.api.container_requests().update(uuid=runnerjob.uuid,
495 body={"priority": "0"}).execute(num_retries=self.num_retries)
498 self.stop_polling.set()
499 self.polling_thread.join()
501 if self.final_status == "UnsupportedRequirement":
502 raise UnsupportedRequirement("Check log for details.")
504 if self.final_output is None:
505 raise WorkflowException("Workflow did not return a result.")
507 if kwargs.get("submit") and isinstance(runnerjob, Runner):
508 logger.info("Final output collection %s", runnerjob.final_output)
510 if self.output_name is None:
511 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
512 if self.output_tags is None:
513 self.output_tags = ""
514 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
515 self.set_crunch_output()
517 if kwargs.get("compute_checksum"):
518 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
519 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
521 return (self.final_output, self.final_status)
525 """Print version string of key packages for provenance and debugging."""
527 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
528 arvpkg = pkg_resources.require("arvados-python-client")
529 cwlpkg = pkg_resources.require("cwltool")
531 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
532 "arvados-python-client", arvpkg[0].version,
533 "cwltool", cwlpkg[0].version)
536 def arg_parser(): # type: () -> argparse.ArgumentParser
537 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
539 parser.add_argument("--basedir", type=str,
540 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
541 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
542 help="Output directory, default current directory")
544 parser.add_argument("--eval-timeout",
545 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
548 parser.add_argument("--version", action="store_true", help="Print version and exit")
550 exgroup = parser.add_mutually_exclusive_group()
551 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
552 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
553 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
555 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
557 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
559 exgroup = parser.add_mutually_exclusive_group()
560 exgroup.add_argument("--enable-reuse", action="store_true",
561 default=True, dest="enable_reuse",
563 exgroup.add_argument("--disable-reuse", action="store_false",
564 default=True, dest="enable_reuse",
567 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
568 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
569 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
570 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
571 help="Ignore Docker image version when deciding whether to reuse past jobs.",
574 exgroup = parser.add_mutually_exclusive_group()
575 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
576 default=True, dest="submit")
577 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
578 default=True, dest="submit")
579 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
580 dest="create_workflow")
581 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
582 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
584 exgroup = parser.add_mutually_exclusive_group()
585 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
586 default=True, dest="wait")
587 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
588 default=True, dest="wait")
590 exgroup = parser.add_mutually_exclusive_group()
591 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
592 default=True, dest="log_timestamps")
593 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
594 default=True, dest="log_timestamps")
596 parser.add_argument("--api", type=str,
597 default=None, dest="work_api",
598 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
600 parser.add_argument("--compute-checksum", action="store_true", default=False,
601 help="Compute checksum of contents while collecting outputs",
602 dest="compute_checksum")
604 parser.add_argument("--submit-runner-ram", type=int,
605 help="RAM (in MiB) required for the workflow runner job (default 1024)",
608 parser.add_argument("--submit-runner-image", type=str,
609 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
612 parser.add_argument("--name", type=str,
613 help="Name to use for workflow execution instance.",
616 parser.add_argument("--on-error", type=str,
617 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
618 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
620 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
621 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
627 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
628 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
629 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
630 cache["http://arvados.org/cwl"] = res.read()
632 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
633 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
634 for n in extnames.names:
635 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
636 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
637 document_loader.idx["http://arvados.org/cwl#"+n] = {}
639 def main(args, stdout, stderr, api_client=None, keep_client=None):
640 parser = arg_parser()
642 job_order_object = None
643 arvargs = parser.parse_args(args)
646 print versionstring()
649 if arvargs.update_workflow:
650 if arvargs.update_workflow.find('-7fd4e-') == 5:
651 want_api = 'containers'
652 elif arvargs.update_workflow.find('-p5p6p-') == 5:
656 if want_api and arvargs.work_api and want_api != arvargs.work_api:
657 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
658 arvargs.update_workflow, want_api, arvargs.work_api))
660 arvargs.work_api = want_api
662 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
663 job_order_object = ({}, "")
668 if api_client is None:
669 api_client=arvados.api('v1', model=OrderedJsonModel())
670 if keep_client is None:
671 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
672 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
673 num_retries=4, output_name=arvargs.output_name,
674 output_tags=arvargs.output_tags)
675 except Exception as e:
680 logger.setLevel(logging.DEBUG)
681 logging.getLogger('arvados').setLevel(logging.DEBUG)
684 logger.setLevel(logging.WARN)
685 logging.getLogger('arvados').setLevel(logging.WARN)
686 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
689 metrics.setLevel(logging.DEBUG)
690 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
692 if arvargs.log_timestamps:
693 arvados.log_handler.setFormatter(logging.Formatter(
694 '%(asctime)s %(name)s %(levelname)s: %(message)s',
695 '%Y-%m-%d %H:%M:%S'))
697 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
699 arvargs.conformance_test = None
700 arvargs.use_container = True
701 arvargs.relax_path_checks = True
702 arvargs.validate = None
704 return cwltool.main.main(args=arvargs,
707 executor=runner.arv_executor,
708 makeTool=runner.arv_make_tool,
709 versionfunc=versionstring,
710 job_order_object=job_order_object,
711 make_fs_access=partial(CollectionFsAccess,
712 api_client=api_client,
713 keep_client=keep_client),
714 fetcher_constructor=partial(CollectionFetcher,
715 api_client=api_client,
716 keep_client=keep_client,
717 num_retries=runner.num_retries),
718 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
719 logger_handler=arvados.log_handler)