3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("writable"):
204 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
205 if obj.get("class") == "CommandLineTool":
206 if self.work_api == "containers":
208 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
209 if obj.get("stderr"):
210 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
211 if obj.get("class") == "DockerRequirement":
212 if obj.get("dockerOutputDirectory"):
213 # TODO: can be supported by containers API, but not jobs API.
214 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
215 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
216 for v in obj.itervalues():
217 self.check_features(v)
218 elif isinstance(obj, list):
219 for i,v in enumerate(obj):
220 with SourceLine(obj, i, UnsupportedRequirement):
221 self.check_features(v)
223 def make_output_collection(self, name, tagsString, outputObj):
224 outputObj = copy.deepcopy(outputObj)
227 def capture(fileobj):
228 files.append(fileobj)
230 adjustDirObjs(outputObj, capture)
231 adjustFileObjs(outputObj, capture)
233 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
235 final = arvados.collection.Collection(api_client=self.api,
236 keep_client=self.keep_client,
237 num_retries=self.num_retries)
240 for k,v in generatemapper.items():
241 if k.startswith("_:"):
242 if v.type == "Directory":
244 if v.type == "CreateFile":
245 with final.open(v.target, "wb") as f:
246 f.write(v.resolved.encode("utf-8"))
249 if not k.startswith("keep:"):
250 raise Exception("Output source is not in keep or a literal")
252 srccollection = sp[0][5:]
253 if srccollection not in srccollections:
255 srccollections[srccollection] = arvados.collection.CollectionReader(
258 keep_client=self.keep_client,
259 num_retries=self.num_retries)
260 except arvados.errors.ArgumentError as e:
261 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
263 reader = srccollections[srccollection]
265 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
266 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
268 logger.warn("While preparing output collection: %s", e)
270 def rewrite(fileobj):
271 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
272 for k in ("basename", "listing", "contents"):
276 adjustDirObjs(outputObj, rewrite)
277 adjustFileObjs(outputObj, rewrite)
279 with final.open("cwl.output.json", "w") as f:
280 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
282 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
284 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
285 final.api_response()["name"],
286 final.manifest_locator())
288 final_uuid = final.manifest_locator()
289 tags = tagsString.split(',')
291 self.api.links().create(body={
292 "head_uuid": final_uuid, "link_class": "tag", "name": tag
293 }).execute(num_retries=self.num_retries)
295 def finalcollection(fileobj):
296 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
298 adjustDirObjs(outputObj, finalcollection)
299 adjustFileObjs(outputObj, finalcollection)
301 return (outputObj, final)
303 def set_crunch_output(self):
304 if self.work_api == "containers":
306 current = self.api.containers().current().execute(num_retries=self.num_retries)
307 except ApiError as e:
308 # Status code 404 just means we're not running in a container.
309 if e.resp.status != 404:
310 logger.info("Getting current container: %s", e)
313 self.api.containers().update(uuid=current['uuid'],
315 'output': self.final_output_collection.portable_data_hash(),
316 }).execute(num_retries=self.num_retries)
317 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
320 }).execute(num_retries=self.num_retries)
321 except Exception as e:
322 logger.info("Setting container output: %s", e)
323 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
324 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
326 'output': self.final_output_collection.portable_data_hash(),
327 'success': self.final_status == "success",
329 }).execute(num_retries=self.num_retries)
331 def arv_executor(self, tool, job_order, **kwargs):
332 self.debug = kwargs.get("debug")
334 tool.visit(self.check_features)
336 self.project_uuid = kwargs.get("project_uuid")
338 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
340 keep_client=self.keep_client)
341 self.fs_access = make_fs_access(kwargs["basedir"])
343 if not kwargs.get("name"):
344 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
346 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
347 # Also uploads docker images.
348 upload_workflow_deps(self, tool)
350 # Reload tool object which may have been updated by
351 # upload_workflow_deps
352 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
353 makeTool=self.arv_make_tool,
354 loader=tool.doc_loader,
355 avsc_names=tool.doc_schema,
356 metadata=tool.metadata)
358 # Upload local file references in the job order.
359 job_order = upload_job_order(self, "%s input" % kwargs["name"],
362 existing_uuid = kwargs.get("update_workflow")
363 if existing_uuid or kwargs.get("create_workflow"):
364 # Create a pipeline template or workflow record and exit.
365 if self.work_api == "jobs":
366 tmpl = RunnerTemplate(self, tool, job_order,
367 kwargs.get("enable_reuse"),
369 submit_runner_ram=kwargs.get("submit_runner_ram"),
372 # cwltool.main will write our return value to stdout.
373 return (tmpl.uuid, "success")
374 elif self.work_api == "containers":
375 return (upload_workflow(self, tool, job_order,
378 submit_runner_ram=kwargs.get("submit_runner_ram"),
379 name=kwargs["name"]),
382 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
384 kwargs["make_fs_access"] = make_fs_access
385 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
386 kwargs["use_container"] = True
387 kwargs["tmpdir_prefix"] = "tmp"
388 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
390 if self.work_api == "containers":
391 kwargs["outdir"] = "/var/spool/cwl"
392 kwargs["docker_outdir"] = "/var/spool/cwl"
393 kwargs["tmpdir"] = "/tmp"
394 kwargs["docker_tmpdir"] = "/tmp"
395 elif self.work_api == "jobs":
396 kwargs["outdir"] = "$(task.outdir)"
397 kwargs["docker_outdir"] = "$(task.outdir)"
398 kwargs["tmpdir"] = "$(task.tmpdir)"
401 if kwargs.get("submit"):
402 # Submit a runner job to run the workflow for us.
403 if self.work_api == "containers":
404 if tool.tool["class"] == "CommandLineTool":
405 kwargs["runnerjob"] = tool.tool["id"]
406 upload_dependencies(self,
412 runnerjob = tool.job(job_order,
413 self.output_callback,
416 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
419 submit_runner_ram=kwargs.get("submit_runner_ram"),
420 name=kwargs.get("name"),
421 on_error=kwargs.get("on_error"),
422 submit_runner_image=kwargs.get("submit_runner_image"))
423 elif self.work_api == "jobs":
424 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
427 submit_runner_ram=kwargs.get("submit_runner_ram"),
428 name=kwargs.get("name"),
429 on_error=kwargs.get("on_error"),
430 submit_runner_image=kwargs.get("submit_runner_image"))
432 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
433 # Create pipeline for local run
434 self.pipeline = self.api.pipeline_instances().create(
436 "owner_uuid": self.project_uuid,
437 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
439 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
440 logger.info("Pipeline instance %s", self.pipeline["uuid"])
442 if runnerjob and not kwargs.get("wait"):
443 runnerjob.run(wait=kwargs.get("wait"))
444 return (runnerjob.uuid, "success")
446 self.poll_api = arvados.api('v1')
447 self.polling_thread = threading.Thread(target=self.poll_states)
448 self.polling_thread.start()
451 jobiter = iter((runnerjob,))
453 if "cwl_runner_job" in kwargs:
454 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
455 jobiter = tool.job(job_order,
456 self.output_callback,
461 # Will continue to hold the lock for the duration of this code
462 # except when in cond.wait(), at which point on_message can update
463 # job state and process output callbacks.
465 loopperf = Perf(metrics, "jobiter")
467 for runnable in jobiter:
470 if self.stop_polling.is_set():
474 with Perf(metrics, "run"):
475 runnable.run(**kwargs)
480 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
485 while self.processes:
488 except UnsupportedRequirement:
491 if sys.exc_info()[0] is KeyboardInterrupt:
492 logger.error("Interrupted, marking pipeline as failed")
494 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
496 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
497 body={"state": "Failed"}).execute(num_retries=self.num_retries)
498 if runnerjob and runnerjob.uuid and self.work_api == "containers":
499 self.api.container_requests().update(uuid=runnerjob.uuid,
500 body={"priority": "0"}).execute(num_retries=self.num_retries)
503 self.stop_polling.set()
504 self.polling_thread.join()
506 if self.final_status == "UnsupportedRequirement":
507 raise UnsupportedRequirement("Check log for details.")
509 if self.final_output is None:
510 raise WorkflowException("Workflow did not return a result.")
512 if kwargs.get("submit") and isinstance(runnerjob, Runner):
513 logger.info("Final output collection %s", runnerjob.final_output)
515 if self.output_name is None:
516 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
517 if self.output_tags is None:
518 self.output_tags = ""
519 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
520 self.set_crunch_output()
522 if kwargs.get("compute_checksum"):
523 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
524 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
526 return (self.final_output, self.final_status)
530 """Print version string of key packages for provenance and debugging."""
532 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
533 arvpkg = pkg_resources.require("arvados-python-client")
534 cwlpkg = pkg_resources.require("cwltool")
536 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
537 "arvados-python-client", arvpkg[0].version,
538 "cwltool", cwlpkg[0].version)
541 def arg_parser(): # type: () -> argparse.ArgumentParser
542 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
544 parser.add_argument("--basedir", type=str,
545 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
546 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
547 help="Output directory, default current directory")
549 parser.add_argument("--eval-timeout",
550 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
553 parser.add_argument("--version", action="store_true", help="Print version and exit")
555 exgroup = parser.add_mutually_exclusive_group()
556 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
557 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
558 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
560 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
562 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
564 exgroup = parser.add_mutually_exclusive_group()
565 exgroup.add_argument("--enable-reuse", action="store_true",
566 default=True, dest="enable_reuse",
568 exgroup.add_argument("--disable-reuse", action="store_false",
569 default=True, dest="enable_reuse",
572 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
573 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
574 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
575 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
576 help="Ignore Docker image version when deciding whether to reuse past jobs.",
579 exgroup = parser.add_mutually_exclusive_group()
580 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
581 default=True, dest="submit")
582 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
583 default=True, dest="submit")
584 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
585 dest="create_workflow")
586 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
587 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
589 exgroup = parser.add_mutually_exclusive_group()
590 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
591 default=True, dest="wait")
592 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
593 default=True, dest="wait")
595 exgroup = parser.add_mutually_exclusive_group()
596 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
597 default=True, dest="log_timestamps")
598 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
599 default=True, dest="log_timestamps")
601 parser.add_argument("--api", type=str,
602 default=None, dest="work_api",
603 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
605 parser.add_argument("--compute-checksum", action="store_true", default=False,
606 help="Compute checksum of contents while collecting outputs",
607 dest="compute_checksum")
609 parser.add_argument("--submit-runner-ram", type=int,
610 help="RAM (in MiB) required for the workflow runner job (default 1024)",
613 parser.add_argument("--submit-runner-image", type=str,
614 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
617 parser.add_argument("--name", type=str,
618 help="Name to use for workflow execution instance.",
621 parser.add_argument("--on-error", type=str,
622 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
623 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
625 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
626 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
632 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
633 cache["http://arvados.org/cwl"] = res.read()
635 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
636 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
637 for n in extnames.names:
638 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
639 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
640 document_loader.idx["http://arvados.org/cwl#"+n] = {}
642 def main(args, stdout, stderr, api_client=None, keep_client=None):
643 parser = arg_parser()
645 job_order_object = None
646 arvargs = parser.parse_args(args)
649 print versionstring()
652 if arvargs.update_workflow:
653 if arvargs.update_workflow.find('-7fd4e-') == 5:
654 want_api = 'containers'
655 elif arvargs.update_workflow.find('-p5p6p-') == 5:
659 if want_api and arvargs.work_api and want_api != arvargs.work_api:
660 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
661 arvargs.update_workflow, want_api, arvargs.work_api))
663 arvargs.work_api = want_api
665 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
666 job_order_object = ({}, "")
671 if api_client is None:
672 api_client=arvados.api('v1', model=OrderedJsonModel())
673 if keep_client is None:
674 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
675 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
676 num_retries=4, output_name=arvargs.output_name,
677 output_tags=arvargs.output_tags)
678 except Exception as e:
683 logger.setLevel(logging.DEBUG)
684 logging.getLogger('arvados').setLevel(logging.DEBUG)
687 logger.setLevel(logging.WARN)
688 logging.getLogger('arvados').setLevel(logging.WARN)
689 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
692 metrics.setLevel(logging.DEBUG)
693 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
695 if arvargs.log_timestamps:
696 arvados.log_handler.setFormatter(logging.Formatter(
697 '%(asctime)s %(name)s %(levelname)s: %(message)s',
698 '%Y-%m-%d %H:%M:%S'))
700 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
702 arvargs.conformance_test = None
703 arvargs.use_container = True
704 arvargs.relax_path_checks = True
705 arvargs.validate = None
707 return cwltool.main.main(args=arvargs,
710 executor=runner.arv_executor,
711 makeTool=runner.arv_make_tool,
712 versionfunc=versionstring,
713 job_order_object=job_order_object,
714 make_fs_access=partial(CollectionFsAccess,
715 api_client=api_client,
716 keep_client=keep_client),
717 fetcher_constructor=partial(CollectionFetcher,
718 api_client=api_client,
719 keep_client=keep_client,
720 num_retries=runner.num_retries),
721 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
722 logger_handler=arvados.log_handler)