3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86 expected_api = ["jobs", "containers"]
87 for api in expected_api:
89 methods = self.api._rootDesc.get('resources')[api]['methods']
90 if ('httpMethod' in methods['create'] and
91 (work_api == api or work_api is None)):
99 raise Exception("No supported APIs")
101 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
103 def arv_make_tool(self, toolpath_object, **kwargs):
104 kwargs["work_api"] = self.work_api
105 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
107 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108 num_retries=self.num_retries)
109 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110 return ArvadosCommandTool(self, toolpath_object, **kwargs)
111 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112 return ArvadosWorkflow(self, toolpath_object, **kwargs)
114 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
116 def output_callback(self, out, processStatus):
117 if processStatus == "success":
118 logger.info("Overall process status is %s", processStatus)
120 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121 body={"state": "Complete"}).execute(num_retries=self.num_retries)
123 logger.warn("Overall process status is %s", processStatus)
125 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126 body={"state": "Failed"}).execute(num_retries=self.num_retries)
127 self.final_status = processStatus
128 self.final_output = out
130 def on_message(self, event):
131 if "object_uuid" in event:
132 if event["object_uuid"] in self.processes and event["event_type"] == "update":
133 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134 uuid = event["object_uuid"]
136 j = self.processes[uuid]
137 logger.info("%s %s is Running", self.label(j), uuid)
139 j.update_pipeline_component(event["properties"]["new_attributes"])
140 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141 uuid = event["object_uuid"]
144 j = self.processes[uuid]
145 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146 with Perf(metrics, "done %s" % j.name):
147 j.done(event["properties"]["new_attributes"])
152 def label(self, obj):
153 return "[%s %s]" % (self.work_api[0:-1], obj.name)
155 def poll_states(self):
156 """Poll status of jobs or containers listed in the processes dict.
158 Runs in a separate thread.
163 self.stop_polling.wait(15)
164 if self.stop_polling.is_set():
167 keys = self.processes.keys()
171 if self.work_api == "containers":
172 table = self.poll_api.container_requests()
173 elif self.work_api == "jobs":
174 table = self.poll_api.jobs()
177 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178 except Exception as e:
179 logger.warn("Error checking states on API server: %s", e)
182 for p in proc_states["items"]:
184 "object_uuid": p["uuid"],
185 "event_type": "update",
191 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
193 self.processes.clear()
197 self.stop_polling.set()
199 def get_uploaded(self):
200 return self.uploaded.copy()
202 def add_uploaded(self, src, pair):
203 self.uploaded[src] = pair
205 def check_features(self, obj):
206 if isinstance(obj, dict):
207 if obj.get("writable"):
208 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209 if obj.get("class") == "DockerRequirement":
210 if obj.get("dockerOutputDirectory"):
211 # TODO: can be supported by containers API, but not jobs API.
212 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
237 for k,v in generatemapper.items():
238 if k.startswith("_:"):
239 if v.type == "Directory":
241 if v.type == "CreateFile":
242 with final.open(v.target, "wb") as f:
243 f.write(v.resolved.encode("utf-8"))
246 if not k.startswith("keep:"):
247 raise Exception("Output source is not in keep or a literal")
249 srccollection = sp[0][5:]
251 reader = self.collection_cache.get(srccollection)
252 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
254 except arvados.errors.ArgumentError as e:
255 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
258 logger.warn("While preparing output collection: %s", e)
260 def rewrite(fileobj):
261 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
262 for k in ("basename", "listing", "contents"):
266 adjustDirObjs(outputObj, rewrite)
267 adjustFileObjs(outputObj, rewrite)
269 with final.open("cwl.output.json", "w") as f:
270 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
272 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
274 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
275 final.api_response()["name"],
276 final.manifest_locator())
278 final_uuid = final.manifest_locator()
279 tags = tagsString.split(',')
281 self.api.links().create(body={
282 "head_uuid": final_uuid, "link_class": "tag", "name": tag
283 }).execute(num_retries=self.num_retries)
285 def finalcollection(fileobj):
286 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
288 adjustDirObjs(outputObj, finalcollection)
289 adjustFileObjs(outputObj, finalcollection)
291 return (outputObj, final)
293 def set_crunch_output(self):
294 if self.work_api == "containers":
296 current = self.api.containers().current().execute(num_retries=self.num_retries)
297 except ApiError as e:
298 # Status code 404 just means we're not running in a container.
299 if e.resp.status != 404:
300 logger.info("Getting current container: %s", e)
303 self.api.containers().update(uuid=current['uuid'],
305 'output': self.final_output_collection.portable_data_hash(),
306 }).execute(num_retries=self.num_retries)
307 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
310 }).execute(num_retries=self.num_retries)
311 except Exception as e:
312 logger.info("Setting container output: %s", e)
313 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
314 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
316 'output': self.final_output_collection.portable_data_hash(),
317 'success': self.final_status == "success",
319 }).execute(num_retries=self.num_retries)
321 def arv_executor(self, tool, job_order, **kwargs):
322 self.debug = kwargs.get("debug")
324 tool.visit(self.check_features)
326 self.project_uuid = kwargs.get("project_uuid")
328 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
329 collection_cache=self.collection_cache)
330 self.fs_access = make_fs_access(kwargs["basedir"])
332 if not kwargs.get("name"):
333 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
335 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
336 # Also uploads docker images.
337 upload_workflow_deps(self, tool)
339 # Reload tool object which may have been updated by
340 # upload_workflow_deps
341 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
342 makeTool=self.arv_make_tool,
343 loader=tool.doc_loader,
344 avsc_names=tool.doc_schema,
345 metadata=tool.metadata)
347 # Upload local file references in the job order.
348 job_order = upload_job_order(self, "%s input" % kwargs["name"],
351 existing_uuid = kwargs.get("update_workflow")
352 if existing_uuid or kwargs.get("create_workflow"):
353 # Create a pipeline template or workflow record and exit.
354 if self.work_api == "jobs":
355 tmpl = RunnerTemplate(self, tool, job_order,
356 kwargs.get("enable_reuse"),
358 submit_runner_ram=kwargs.get("submit_runner_ram"),
361 # cwltool.main will write our return value to stdout.
362 return (tmpl.uuid, "success")
363 elif self.work_api == "containers":
364 return (upload_workflow(self, tool, job_order,
367 submit_runner_ram=kwargs.get("submit_runner_ram"),
368 name=kwargs["name"]),
371 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
373 kwargs["make_fs_access"] = make_fs_access
374 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
375 kwargs["use_container"] = True
376 kwargs["tmpdir_prefix"] = "tmp"
377 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
379 if self.work_api == "containers":
380 kwargs["outdir"] = "/var/spool/cwl"
381 kwargs["docker_outdir"] = "/var/spool/cwl"
382 kwargs["tmpdir"] = "/tmp"
383 kwargs["docker_tmpdir"] = "/tmp"
384 elif self.work_api == "jobs":
385 kwargs["outdir"] = "$(task.outdir)"
386 kwargs["docker_outdir"] = "$(task.outdir)"
387 kwargs["tmpdir"] = "$(task.tmpdir)"
390 if kwargs.get("submit"):
391 # Submit a runner job to run the workflow for us.
392 if self.work_api == "containers":
393 if tool.tool["class"] == "CommandLineTool":
394 kwargs["runnerjob"] = tool.tool["id"]
395 upload_dependencies(self,
401 runnerjob = tool.job(job_order,
402 self.output_callback,
405 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
408 submit_runner_ram=kwargs.get("submit_runner_ram"),
409 name=kwargs.get("name"),
410 on_error=kwargs.get("on_error"),
411 submit_runner_image=kwargs.get("submit_runner_image"))
412 elif self.work_api == "jobs":
413 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
416 submit_runner_ram=kwargs.get("submit_runner_ram"),
417 name=kwargs.get("name"),
418 on_error=kwargs.get("on_error"),
419 submit_runner_image=kwargs.get("submit_runner_image"))
421 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
422 # Create pipeline for local run
423 self.pipeline = self.api.pipeline_instances().create(
425 "owner_uuid": self.project_uuid,
426 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
428 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
429 logger.info("Pipeline instance %s", self.pipeline["uuid"])
431 if runnerjob and not kwargs.get("wait"):
432 runnerjob.run(wait=kwargs.get("wait"))
433 return (runnerjob.uuid, "success")
435 self.poll_api = arvados.api('v1')
436 self.polling_thread = threading.Thread(target=self.poll_states)
437 self.polling_thread.start()
440 jobiter = iter((runnerjob,))
442 if "cwl_runner_job" in kwargs:
443 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
444 jobiter = tool.job(job_order,
445 self.output_callback,
450 # Will continue to hold the lock for the duration of this code
451 # except when in cond.wait(), at which point on_message can update
452 # job state and process output callbacks.
454 loopperf = Perf(metrics, "jobiter")
456 for runnable in jobiter:
459 if self.stop_polling.is_set():
463 with Perf(metrics, "run"):
464 runnable.run(**kwargs)
469 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
474 while self.processes:
477 except UnsupportedRequirement:
480 if sys.exc_info()[0] is KeyboardInterrupt:
481 logger.error("Interrupted, marking pipeline as failed")
483 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
485 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
486 body={"state": "Failed"}).execute(num_retries=self.num_retries)
487 if runnerjob and runnerjob.uuid and self.work_api == "containers":
488 self.api.container_requests().update(uuid=runnerjob.uuid,
489 body={"priority": "0"}).execute(num_retries=self.num_retries)
492 self.stop_polling.set()
493 self.polling_thread.join()
495 if self.final_status == "UnsupportedRequirement":
496 raise UnsupportedRequirement("Check log for details.")
498 if self.final_output is None:
499 raise WorkflowException("Workflow did not return a result.")
501 if kwargs.get("submit") and isinstance(runnerjob, Runner):
502 logger.info("Final output collection %s", runnerjob.final_output)
504 if self.output_name is None:
505 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
506 if self.output_tags is None:
507 self.output_tags = ""
508 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
509 self.set_crunch_output()
511 if kwargs.get("compute_checksum"):
512 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
513 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
515 return (self.final_output, self.final_status)
519 """Print version string of key packages for provenance and debugging."""
521 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
522 arvpkg = pkg_resources.require("arvados-python-client")
523 cwlpkg = pkg_resources.require("cwltool")
525 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
526 "arvados-python-client", arvpkg[0].version,
527 "cwltool", cwlpkg[0].version)
530 def arg_parser(): # type: () -> argparse.ArgumentParser
531 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
533 parser.add_argument("--basedir", type=str,
534 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
535 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
536 help="Output directory, default current directory")
538 parser.add_argument("--eval-timeout",
539 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
542 parser.add_argument("--version", action="store_true", help="Print version and exit")
544 exgroup = parser.add_mutually_exclusive_group()
545 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
546 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
547 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
549 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
551 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
553 exgroup = parser.add_mutually_exclusive_group()
554 exgroup.add_argument("--enable-reuse", action="store_true",
555 default=True, dest="enable_reuse",
557 exgroup.add_argument("--disable-reuse", action="store_false",
558 default=True, dest="enable_reuse",
561 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
562 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
563 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
564 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
565 help="Ignore Docker image version when deciding whether to reuse past jobs.",
568 exgroup = parser.add_mutually_exclusive_group()
569 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
570 default=True, dest="submit")
571 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
572 default=True, dest="submit")
573 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
574 dest="create_workflow")
575 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
576 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
578 exgroup = parser.add_mutually_exclusive_group()
579 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
580 default=True, dest="wait")
581 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
582 default=True, dest="wait")
584 exgroup = parser.add_mutually_exclusive_group()
585 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
586 default=True, dest="log_timestamps")
587 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
588 default=True, dest="log_timestamps")
590 parser.add_argument("--api", type=str,
591 default=None, dest="work_api",
592 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
594 parser.add_argument("--compute-checksum", action="store_true", default=False,
595 help="Compute checksum of contents while collecting outputs",
596 dest="compute_checksum")
598 parser.add_argument("--submit-runner-ram", type=int,
599 help="RAM (in MiB) required for the workflow runner job (default 1024)",
602 parser.add_argument("--submit-runner-image", type=str,
603 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
606 parser.add_argument("--name", type=str,
607 help="Name to use for workflow execution instance.",
610 parser.add_argument("--on-error", type=str,
611 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
612 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
614 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
615 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
620 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
621 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
622 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
623 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
625 cwltool.process.supportedProcessRequirements.extend([
626 "http://arvados.org/cwl#RunInSingleContainer",
627 "http://arvados.org/cwl#OutputDirType",
628 "http://arvados.org/cwl#RuntimeConstraints",
629 "http://arvados.org/cwl#PartitionRequirement",
630 "http://arvados.org/cwl#APIRequirement",
631 "http://commonwl.org/cwltool#LoadListingRequirement"
634 def main(args, stdout, stderr, api_client=None, keep_client=None):
635 parser = arg_parser()
637 job_order_object = None
638 arvargs = parser.parse_args(args)
641 print versionstring()
644 if arvargs.update_workflow:
645 if arvargs.update_workflow.find('-7fd4e-') == 5:
646 want_api = 'containers'
647 elif arvargs.update_workflow.find('-p5p6p-') == 5:
651 if want_api and arvargs.work_api and want_api != arvargs.work_api:
652 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
653 arvargs.update_workflow, want_api, arvargs.work_api))
655 arvargs.work_api = want_api
657 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
658 job_order_object = ({}, "")
663 if api_client is None:
664 api_client=arvados.api('v1', model=OrderedJsonModel())
665 if keep_client is None:
666 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
667 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
668 num_retries=4, output_name=arvargs.output_name,
669 output_tags=arvargs.output_tags)
670 except Exception as e:
675 logger.setLevel(logging.DEBUG)
676 logging.getLogger('arvados').setLevel(logging.DEBUG)
679 logger.setLevel(logging.WARN)
680 logging.getLogger('arvados').setLevel(logging.WARN)
681 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
684 metrics.setLevel(logging.DEBUG)
685 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
687 if arvargs.log_timestamps:
688 arvados.log_handler.setFormatter(logging.Formatter(
689 '%(asctime)s %(name)s %(levelname)s: %(message)s',
690 '%Y-%m-%d %H:%M:%S'))
692 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
694 arvargs.conformance_test = None
695 arvargs.use_container = True
696 arvargs.relax_path_checks = True
697 arvargs.validate = None
699 make_fs_access = partial(CollectionFsAccess,
700 collection_cache=runner.collection_cache)
702 return cwltool.main.main(args=arvargs,
705 executor=runner.arv_executor,
706 makeTool=runner.arv_make_tool,
707 versionfunc=versionstring,
708 job_order_object=job_order_object,
709 make_fs_access=make_fs_access,
710 fetcher_constructor=partial(CollectionFetcher,
711 api_client=api_client,
712 fs_access=make_fs_access(""),
713 num_retries=runner.num_retries),
714 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
715 logger_handler=arvados.log_handler,
716 custom_schema_callback=add_arv_hints)