3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("writable"):
204 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
205 if obj.get("class") == "CommandLineTool":
206 if self.work_api == "containers":
208 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
209 if obj.get("stderr"):
210 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
211 if obj.get("class") == "DockerRequirement":
212 if obj.get("dockerOutputDirectory"):
213 # TODO: can be supported by containers API, but not jobs API.
214 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
215 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
216 for v in obj.itervalues():
217 self.check_features(v)
218 elif isinstance(obj, list):
219 for i,v in enumerate(obj):
220 with SourceLine(obj, i, UnsupportedRequirement):
221 self.check_features(v)
223 def make_output_collection(self, name, tagsString, outputObj):
224 outputObj = copy.deepcopy(outputObj)
227 def capture(fileobj):
228 files.append(fileobj)
230 adjustDirObjs(outputObj, capture)
231 adjustFileObjs(outputObj, capture)
233 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
235 final = arvados.collection.Collection(api_client=self.api,
236 keep_client=self.keep_client,
237 num_retries=self.num_retries)
240 for k,v in generatemapper.items():
241 if k.startswith("_:"):
242 if v.type == "Directory":
244 if v.type == "CreateFile":
245 with final.open(v.target, "wb") as f:
246 f.write(v.resolved.encode("utf-8"))
249 if not k.startswith("keep:"):
250 raise Exception("Output source is not in keep or a literal")
252 srccollection = sp[0][5:]
253 if srccollection not in srccollections:
255 srccollections[srccollection] = arvados.collection.CollectionReader(
258 keep_client=self.keep_client,
259 num_retries=self.num_retries)
260 except arvados.errors.ArgumentError as e:
261 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
263 reader = srccollections[srccollection]
265 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
266 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
268 logger.warn("While preparing output collection: %s", e)
270 def rewrite(fileobj):
271 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
272 for k in ("basename", "listing", "contents"):
276 adjustDirObjs(outputObj, rewrite)
277 adjustFileObjs(outputObj, rewrite)
279 with final.open("cwl.output.json", "w") as f:
280 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
282 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
284 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
285 final.api_response()["name"],
286 final.manifest_locator())
288 final_uuid = final.manifest_locator()
289 tags = tagsString.split(',')
291 self.api.links().create(body={
292 "head_uuid": final_uuid, "link_class": "tag", "name": tag
293 }).execute(num_retries=self.num_retries)
295 def finalcollection(fileobj):
296 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
298 adjustDirObjs(outputObj, finalcollection)
299 adjustFileObjs(outputObj, finalcollection)
301 return (outputObj, final)
303 def set_crunch_output(self):
304 if self.work_api == "containers":
306 current = self.api.containers().current().execute(num_retries=self.num_retries)
307 except ApiError as e:
308 # Status code 404 just means we're not running in a container.
309 if e.resp.status != 404:
310 logger.info("Getting current container: %s", e)
313 self.api.containers().update(uuid=current['uuid'],
315 'output': self.final_output_collection.portable_data_hash(),
316 }).execute(num_retries=self.num_retries)
317 except Exception as e:
318 logger.info("Setting container output: %s", e)
319 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
320 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
322 'output': self.final_output_collection.portable_data_hash(),
323 'success': self.final_status == "success",
325 }).execute(num_retries=self.num_retries)
327 def arv_executor(self, tool, job_order, **kwargs):
328 self.debug = kwargs.get("debug")
330 tool.visit(self.check_features)
332 self.project_uuid = kwargs.get("project_uuid")
334 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
336 keep_client=self.keep_client)
337 self.fs_access = make_fs_access(kwargs["basedir"])
339 if not kwargs.get("name"):
340 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
342 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
343 # Also uploads docker images.
344 upload_workflow_deps(self, tool)
346 # Reload tool object which may have been updated by
347 # upload_workflow_deps
348 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
349 makeTool=self.arv_make_tool,
350 loader=tool.doc_loader,
351 avsc_names=tool.doc_schema,
352 metadata=tool.metadata)
354 # Upload local file references in the job order.
355 job_order = upload_job_order(self, "%s input" % kwargs["name"],
358 existing_uuid = kwargs.get("update_workflow")
359 if existing_uuid or kwargs.get("create_workflow"):
360 # Create a pipeline template or workflow record and exit.
361 if self.work_api == "jobs":
362 tmpl = RunnerTemplate(self, tool, job_order,
363 kwargs.get("enable_reuse"),
365 submit_runner_ram=kwargs.get("submit_runner_ram"),
368 # cwltool.main will write our return value to stdout.
369 return (tmpl.uuid, "success")
370 elif self.work_api == "containers":
371 return (upload_workflow(self, tool, job_order,
374 submit_runner_ram=kwargs.get("submit_runner_ram"),
375 name=kwargs["name"]),
378 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
380 kwargs["make_fs_access"] = make_fs_access
381 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
382 kwargs["use_container"] = True
383 kwargs["tmpdir_prefix"] = "tmp"
384 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
386 if self.work_api == "containers":
387 kwargs["outdir"] = "/var/spool/cwl"
388 kwargs["docker_outdir"] = "/var/spool/cwl"
389 kwargs["tmpdir"] = "/tmp"
390 kwargs["docker_tmpdir"] = "/tmp"
391 elif self.work_api == "jobs":
392 kwargs["outdir"] = "$(task.outdir)"
393 kwargs["docker_outdir"] = "$(task.outdir)"
394 kwargs["tmpdir"] = "$(task.tmpdir)"
397 if kwargs.get("submit"):
398 # Submit a runner job to run the workflow for us.
399 if self.work_api == "containers":
400 if tool.tool["class"] == "CommandLineTool":
401 kwargs["runnerjob"] = tool.tool["id"]
402 upload_dependencies(self,
408 runnerjob = tool.job(job_order,
409 self.output_callback,
412 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
415 submit_runner_ram=kwargs.get("submit_runner_ram"),
416 name=kwargs.get("name"),
417 on_error=kwargs.get("on_error"),
418 submit_runner_image=kwargs.get("submit_runner_image"))
419 elif self.work_api == "jobs":
420 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
423 submit_runner_ram=kwargs.get("submit_runner_ram"),
424 name=kwargs.get("name"),
425 on_error=kwargs.get("on_error"),
426 submit_runner_image=kwargs.get("submit_runner_image"))
428 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
429 # Create pipeline for local run
430 self.pipeline = self.api.pipeline_instances().create(
432 "owner_uuid": self.project_uuid,
433 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
435 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
436 logger.info("Pipeline instance %s", self.pipeline["uuid"])
438 if runnerjob and not kwargs.get("wait"):
439 runnerjob.run(wait=kwargs.get("wait"))
440 return (runnerjob.uuid, "success")
442 self.poll_api = arvados.api('v1')
443 self.polling_thread = threading.Thread(target=self.poll_states)
444 self.polling_thread.start()
447 jobiter = iter((runnerjob,))
449 if "cwl_runner_job" in kwargs:
450 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
451 jobiter = tool.job(job_order,
452 self.output_callback,
457 # Will continue to hold the lock for the duration of this code
458 # except when in cond.wait(), at which point on_message can update
459 # job state and process output callbacks.
461 loopperf = Perf(metrics, "jobiter")
463 for runnable in jobiter:
466 if self.stop_polling.is_set():
470 with Perf(metrics, "run"):
471 runnable.run(**kwargs)
476 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
481 while self.processes:
484 except UnsupportedRequirement:
487 if sys.exc_info()[0] is KeyboardInterrupt:
488 logger.error("Interrupted, marking pipeline as failed")
490 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
492 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
493 body={"state": "Failed"}).execute(num_retries=self.num_retries)
494 if runnerjob and runnerjob.uuid and self.work_api == "containers":
495 self.api.container_requests().update(uuid=runnerjob.uuid,
496 body={"priority": "0"}).execute(num_retries=self.num_retries)
499 self.stop_polling.set()
500 self.polling_thread.join()
502 if self.final_status == "UnsupportedRequirement":
503 raise UnsupportedRequirement("Check log for details.")
505 if self.final_output is None:
506 raise WorkflowException("Workflow did not return a result.")
508 if kwargs.get("submit") and isinstance(runnerjob, Runner):
509 logger.info("Final output collection %s", runnerjob.final_output)
511 if self.output_name is None:
512 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
513 if self.output_tags is None:
514 self.output_tags = ""
515 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
516 self.set_crunch_output()
518 if kwargs.get("compute_checksum"):
519 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
520 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
522 return (self.final_output, self.final_status)
526 """Print version string of key packages for provenance and debugging."""
528 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
529 arvpkg = pkg_resources.require("arvados-python-client")
530 cwlpkg = pkg_resources.require("cwltool")
532 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
533 "arvados-python-client", arvpkg[0].version,
534 "cwltool", cwlpkg[0].version)
537 def arg_parser(): # type: () -> argparse.ArgumentParser
538 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
540 parser.add_argument("--basedir", type=str,
541 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
542 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
543 help="Output directory, default current directory")
545 parser.add_argument("--eval-timeout",
546 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
549 parser.add_argument("--version", action="store_true", help="Print version and exit")
551 exgroup = parser.add_mutually_exclusive_group()
552 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
553 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
554 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
556 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
558 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
560 exgroup = parser.add_mutually_exclusive_group()
561 exgroup.add_argument("--enable-reuse", action="store_true",
562 default=True, dest="enable_reuse",
564 exgroup.add_argument("--disable-reuse", action="store_false",
565 default=True, dest="enable_reuse",
568 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
569 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
570 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
571 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
572 help="Ignore Docker image version when deciding whether to reuse past jobs.",
575 exgroup = parser.add_mutually_exclusive_group()
576 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
577 default=True, dest="submit")
578 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
579 default=True, dest="submit")
580 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
581 dest="create_workflow")
582 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
583 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
585 exgroup = parser.add_mutually_exclusive_group()
586 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
587 default=True, dest="wait")
588 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
589 default=True, dest="wait")
591 exgroup = parser.add_mutually_exclusive_group()
592 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
593 default=True, dest="log_timestamps")
594 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
595 default=True, dest="log_timestamps")
597 parser.add_argument("--api", type=str,
598 default=None, dest="work_api",
599 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
601 parser.add_argument("--compute-checksum", action="store_true", default=False,
602 help="Compute checksum of contents while collecting outputs",
603 dest="compute_checksum")
605 parser.add_argument("--submit-runner-ram", type=int,
606 help="RAM (in MiB) required for the workflow runner job (default 1024)",
609 parser.add_argument("--submit-runner-image", type=str,
610 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
613 parser.add_argument("--name", type=str,
614 help="Name to use for workflow execution instance.",
617 parser.add_argument("--on-error", type=str,
618 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
619 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
621 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
622 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
628 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
629 cache["http://arvados.org/cwl"] = res.read()
631 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
632 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
633 for n in extnames.names:
634 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
635 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
636 document_loader.idx["http://arvados.org/cwl#"+n] = {}
638 def main(args, stdout, stderr, api_client=None, keep_client=None):
639 parser = arg_parser()
641 job_order_object = None
642 arvargs = parser.parse_args(args)
645 print versionstring()
648 if arvargs.update_workflow:
649 if arvargs.update_workflow.find('-7fd4e-') == 5:
650 want_api = 'containers'
651 elif arvargs.update_workflow.find('-p5p6p-') == 5:
655 if want_api and arvargs.work_api and want_api != arvargs.work_api:
656 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
657 arvargs.update_workflow, want_api, arvargs.work_api))
659 arvargs.work_api = want_api
661 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
662 job_order_object = ({}, "")
667 if api_client is None:
668 api_client=arvados.api('v1', model=OrderedJsonModel())
669 if keep_client is None:
670 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
671 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
672 num_retries=4, output_name=arvargs.output_name,
673 output_tags=arvargs.output_tags)
674 except Exception as e:
679 logger.setLevel(logging.DEBUG)
680 logging.getLogger('arvados').setLevel(logging.DEBUG)
683 logger.setLevel(logging.WARN)
684 logging.getLogger('arvados').setLevel(logging.WARN)
685 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
688 metrics.setLevel(logging.DEBUG)
689 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
691 if arvargs.log_timestamps:
692 arvados.log_handler.setFormatter(logging.Formatter(
693 '%(asctime)s %(name)s %(levelname)s: %(message)s',
694 '%Y-%m-%d %H:%M:%S'))
696 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
698 arvargs.conformance_test = None
699 arvargs.use_container = True
700 arvargs.relax_path_checks = True
701 arvargs.validate = None
703 return cwltool.main.main(args=arvargs,
706 executor=runner.arv_executor,
707 makeTool=runner.arv_make_tool,
708 versionfunc=versionstring,
709 job_order_object=job_order_object,
710 make_fs_access=partial(CollectionFsAccess,
711 api_client=api_client,
712 keep_client=keep_client),
713 fetcher_constructor=partial(CollectionFetcher,
714 api_client=api_client,
715 keep_client=keep_client,
716 num_retries=runner.num_retries),
717 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
718 logger_handler=arvados.log_handler)