3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
14 from functools import partial
15 import pkg_resources # part of setuptools
17 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 import cwltool.process
22 from schema_salad.sourceline import SourceLine
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
49 arvados.log_handler.setFormatter(logging.Formatter(
50 '%(asctime)s %(name)s %(levelname)s: %(message)s',
53 class ArvCwlRunner(object):
54 """Execute a CWL tool or workflow, submit work (using either jobs or
55 containers API), wait for them to complete, and report output.
59 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
62 self.lock = threading.Lock()
63 self.cond = threading.Condition(self.lock)
64 self.final_output = None
65 self.final_status = None
67 self.num_retries = num_retries
69 self.stop_polling = threading.Event()
72 self.final_output_collection = None
73 self.output_name = output_name
74 self.output_tags = output_tags
75 self.project_uuid = None
77 if keep_client is not None:
78 self.keep_client = keep_client
80 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 expected_api = ["jobs", "containers"]
84 for api in expected_api:
86 methods = self.api._rootDesc.get('resources')[api]['methods']
87 if ('httpMethod' in methods['create'] and
88 (work_api == api or work_api is None)):
96 raise Exception("No supported APIs")
98 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100 def arv_make_tool(self, toolpath_object, **kwargs):
101 kwargs["work_api"] = self.work_api
102 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104 keep_client=self.keep_client)
105 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106 return ArvadosCommandTool(self, toolpath_object, **kwargs)
107 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108 return ArvadosWorkflow(self, toolpath_object, **kwargs)
110 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112 def output_callback(self, out, processStatus):
113 if processStatus == "success":
114 logger.info("Overall process status is %s", processStatus)
116 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117 body={"state": "Complete"}).execute(num_retries=self.num_retries)
119 logger.warn("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Failed"}).execute(num_retries=self.num_retries)
123 self.final_status = processStatus
124 self.final_output = out
126 def on_message(self, event):
127 if "object_uuid" in event:
128 if event["object_uuid"] in self.processes and event["event_type"] == "update":
129 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130 uuid = event["object_uuid"]
132 j = self.processes[uuid]
133 logger.info("%s %s is Running", self.label(j), uuid)
135 j.update_pipeline_component(event["properties"]["new_attributes"])
136 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137 uuid = event["object_uuid"]
140 j = self.processes[uuid]
141 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142 with Perf(metrics, "done %s" % j.name):
143 j.done(event["properties"]["new_attributes"])
148 def label(self, obj):
149 return "[%s %s]" % (self.work_api[0:-1], obj.name)
151 def poll_states(self):
152 """Poll status of jobs or containers listed in the processes dict.
154 Runs in a separate thread.
159 self.stop_polling.wait(15)
160 if self.stop_polling.is_set():
163 keys = self.processes.keys()
167 if self.work_api == "containers":
168 table = self.poll_api.container_requests()
169 elif self.work_api == "jobs":
170 table = self.poll_api.jobs()
173 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174 except Exception as e:
175 logger.warn("Error checking states on API server: %s", e)
178 for p in proc_states["items"]:
180 "object_uuid": p["uuid"],
181 "event_type": "update",
187 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189 self.processes.clear()
193 self.stop_polling.set()
195 def get_uploaded(self):
196 return self.uploaded.copy()
198 def add_uploaded(self, src, pair):
199 self.uploaded[src] = pair
201 def check_features(self, obj):
202 if isinstance(obj, dict):
203 if obj.get("class") == "InitialWorkDirRequirement":
204 if self.work_api == "containers":
205 raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206 if obj.get("writable"):
207 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208 if obj.get("class") == "CommandLineTool":
209 if self.work_api == "containers":
211 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212 if obj.get("stderr"):
213 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214 if obj.get("class") == "DockerRequirement":
215 if obj.get("dockerOutputDirectory"):
216 # TODO: can be supported by containers API, but not jobs API.
217 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
218 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
219 for v in obj.itervalues():
220 self.check_features(v)
221 elif isinstance(obj, list):
222 for i,v in enumerate(obj):
223 with SourceLine(obj, i, UnsupportedRequirement):
224 self.check_features(v)
226 def make_output_collection(self, name, tagsString, outputObj):
227 outputObj = copy.deepcopy(outputObj)
230 def capture(fileobj):
231 files.append(fileobj)
233 adjustDirObjs(outputObj, capture)
234 adjustFileObjs(outputObj, capture)
236 generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
238 final = arvados.collection.Collection(api_client=self.api,
239 keep_client=self.keep_client,
240 num_retries=self.num_retries)
243 for k,v in generatemapper.items():
244 if k.startswith("_:"):
245 if v.type == "Directory":
247 if v.type == "CreateFile":
248 with final.open(v.target, "wb") as f:
249 f.write(v.resolved.encode("utf-8"))
252 if not k.startswith("keep:"):
253 raise Exception("Output source is not in keep or a literal")
255 srccollection = sp[0][5:]
256 if srccollection not in srccollections:
258 srccollections[srccollection] = arvados.collection.CollectionReader(
261 keep_client=self.keep_client,
262 num_retries=self.num_retries)
263 except arvados.errors.ArgumentError as e:
264 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
266 reader = srccollections[srccollection]
268 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
271 logger.warn("While preparing output collection: %s", e)
273 def rewrite(fileobj):
274 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
275 for k in ("basename", "listing", "contents"):
279 adjustDirObjs(outputObj, rewrite)
280 adjustFileObjs(outputObj, rewrite)
282 with final.open("cwl.output.json", "w") as f:
283 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
285 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
287 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
288 final.api_response()["name"],
289 final.manifest_locator())
291 final_uuid = final.manifest_locator()
292 tags = tagsString.split(',')
294 self.api.links().create(body={
295 "head_uuid": final_uuid, "link_class": "tag", "name": tag
296 }).execute(num_retries=self.num_retries)
298 def finalcollection(fileobj):
299 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
301 adjustDirObjs(outputObj, finalcollection)
302 adjustFileObjs(outputObj, finalcollection)
304 return (outputObj, final)
306 def set_crunch_output(self):
307 if self.work_api == "containers":
309 current = self.api.containers().current().execute(num_retries=self.num_retries)
310 except ApiError as e:
311 # Status code 404 just means we're not running in a container.
312 if e.resp.status != 404:
313 logger.info("Getting current container: %s", e)
316 self.api.containers().update(uuid=current['uuid'],
318 'output': self.final_output_collection.portable_data_hash(),
319 }).execute(num_retries=self.num_retries)
320 except Exception as e:
321 logger.info("Setting container output: %s", e)
322 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
323 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
325 'output': self.final_output_collection.portable_data_hash(),
326 'success': self.final_status == "success",
328 }).execute(num_retries=self.num_retries)
330 def arv_executor(self, tool, job_order, **kwargs):
331 self.debug = kwargs.get("debug")
333 tool.visit(self.check_features)
335 self.project_uuid = kwargs.get("project_uuid")
337 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
339 keep_client=self.keep_client)
340 self.fs_access = make_fs_access(kwargs["basedir"])
342 if not kwargs.get("name"):
343 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
345 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
346 # Also uploads docker images.
347 upload_workflow_deps(self, tool)
349 # Reload tool object which may have been updated by
350 # upload_workflow_deps
351 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
352 makeTool=self.arv_make_tool,
353 loader=tool.doc_loader,
354 avsc_names=tool.doc_schema,
355 metadata=tool.metadata)
357 # Upload local file references in the job order.
358 job_order = upload_job_order(self, "%s input" % kwargs["name"],
361 existing_uuid = kwargs.get("update_workflow")
362 if existing_uuid or kwargs.get("create_workflow"):
363 # Create a pipeline template or workflow record and exit.
364 if self.work_api == "jobs":
365 tmpl = RunnerTemplate(self, tool, job_order,
366 kwargs.get("enable_reuse"),
368 submit_runner_ram=kwargs.get("submit_runner_ram"),
371 # cwltool.main will write our return value to stdout.
372 return (tmpl.uuid, "success")
373 elif self.work_api == "containers":
374 return (upload_workflow(self, tool, job_order,
377 submit_runner_ram=kwargs.get("submit_runner_ram"),
378 name=kwargs["name"]),
381 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
383 kwargs["make_fs_access"] = make_fs_access
384 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
385 kwargs["use_container"] = True
386 kwargs["tmpdir_prefix"] = "tmp"
387 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
389 if self.work_api == "containers":
390 kwargs["outdir"] = "/var/spool/cwl"
391 kwargs["docker_outdir"] = "/var/spool/cwl"
392 kwargs["tmpdir"] = "/tmp"
393 kwargs["docker_tmpdir"] = "/tmp"
394 elif self.work_api == "jobs":
395 kwargs["outdir"] = "$(task.outdir)"
396 kwargs["docker_outdir"] = "$(task.outdir)"
397 kwargs["tmpdir"] = "$(task.tmpdir)"
400 if kwargs.get("submit"):
401 # Submit a runner job to run the workflow for us.
402 if self.work_api == "containers":
403 if tool.tool["class"] == "CommandLineTool":
404 kwargs["runnerjob"] = tool.tool["id"]
405 upload_dependencies(self,
411 runnerjob = tool.job(job_order,
412 self.output_callback,
415 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
418 submit_runner_ram=kwargs.get("submit_runner_ram"),
419 name=kwargs.get("name"),
420 on_error=kwargs.get("on_error"),
421 submit_runner_image=kwargs.get("submit_runner_image"))
422 elif self.work_api == "jobs":
423 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
426 submit_runner_ram=kwargs.get("submit_runner_ram"),
427 name=kwargs.get("name"),
428 on_error=kwargs.get("on_error"),
429 submit_runner_image=kwargs.get("submit_runner_image"))
431 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
432 # Create pipeline for local run
433 self.pipeline = self.api.pipeline_instances().create(
435 "owner_uuid": self.project_uuid,
436 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
438 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
439 logger.info("Pipeline instance %s", self.pipeline["uuid"])
441 if runnerjob and not kwargs.get("wait"):
442 runnerjob.run(wait=kwargs.get("wait"))
443 return (runnerjob.uuid, "success")
445 self.poll_api = arvados.api('v1')
446 self.polling_thread = threading.Thread(target=self.poll_states)
447 self.polling_thread.start()
450 jobiter = iter((runnerjob,))
452 if "cwl_runner_job" in kwargs:
453 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
454 jobiter = tool.job(job_order,
455 self.output_callback,
460 # Will continue to hold the lock for the duration of this code
461 # except when in cond.wait(), at which point on_message can update
462 # job state and process output callbacks.
464 loopperf = Perf(metrics, "jobiter")
466 for runnable in jobiter:
469 if self.stop_polling.is_set():
473 with Perf(metrics, "run"):
474 runnable.run(**kwargs)
479 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
484 while self.processes:
487 except UnsupportedRequirement:
490 if sys.exc_info()[0] is KeyboardInterrupt:
491 logger.error("Interrupted, marking pipeline as failed")
493 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
495 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
496 body={"state": "Failed"}).execute(num_retries=self.num_retries)
497 if runnerjob and runnerjob.uuid and self.work_api == "containers":
498 self.api.container_requests().update(uuid=runnerjob.uuid,
499 body={"priority": "0"}).execute(num_retries=self.num_retries)
502 self.stop_polling.set()
503 self.polling_thread.join()
505 if self.final_status == "UnsupportedRequirement":
506 raise UnsupportedRequirement("Check log for details.")
508 if self.final_output is None:
509 raise WorkflowException("Workflow did not return a result.")
511 if kwargs.get("submit") and isinstance(runnerjob, Runner):
512 logger.info("Final output collection %s", runnerjob.final_output)
514 if self.output_name is None:
515 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
516 if self.output_tags is None:
517 self.output_tags = ""
518 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
519 self.set_crunch_output()
521 if kwargs.get("compute_checksum"):
522 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
523 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
525 return (self.final_output, self.final_status)
529 """Print version string of key packages for provenance and debugging."""
531 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
532 arvpkg = pkg_resources.require("arvados-python-client")
533 cwlpkg = pkg_resources.require("cwltool")
535 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
536 "arvados-python-client", arvpkg[0].version,
537 "cwltool", cwlpkg[0].version)
540 def arg_parser(): # type: () -> argparse.ArgumentParser
541 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
543 parser.add_argument("--basedir", type=str,
544 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
545 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
546 help="Output directory, default current directory")
548 parser.add_argument("--eval-timeout",
549 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
552 parser.add_argument("--version", action="store_true", help="Print version and exit")
554 exgroup = parser.add_mutually_exclusive_group()
555 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
556 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
557 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
559 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
561 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
563 exgroup = parser.add_mutually_exclusive_group()
564 exgroup.add_argument("--enable-reuse", action="store_true",
565 default=True, dest="enable_reuse",
567 exgroup.add_argument("--disable-reuse", action="store_false",
568 default=True, dest="enable_reuse",
571 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
572 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
573 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
574 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
575 help="Ignore Docker image version when deciding whether to reuse past jobs.",
578 exgroup = parser.add_mutually_exclusive_group()
579 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
580 default=True, dest="submit")
581 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
582 default=True, dest="submit")
583 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
584 dest="create_workflow")
585 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
586 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
588 exgroup = parser.add_mutually_exclusive_group()
589 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
590 default=True, dest="wait")
591 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
592 default=True, dest="wait")
594 exgroup = parser.add_mutually_exclusive_group()
595 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
596 default=True, dest="log_timestamps")
597 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
598 default=True, dest="log_timestamps")
600 parser.add_argument("--api", type=str,
601 default=None, dest="work_api",
602 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
604 parser.add_argument("--compute-checksum", action="store_true", default=False,
605 help="Compute checksum of contents while collecting outputs",
606 dest="compute_checksum")
608 parser.add_argument("--submit-runner-ram", type=int,
609 help="RAM (in MiB) required for the workflow runner job (default 1024)",
612 parser.add_argument("--submit-runner-image", type=str,
613 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
616 parser.add_argument("--name", type=str,
617 help="Name to use for workflow execution instance.",
620 parser.add_argument("--on-error", type=str,
621 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
622 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
624 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
625 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
631 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
632 cache["http://arvados.org/cwl"] = res.read()
634 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
635 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
636 for n in extnames.names:
637 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
638 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
639 document_loader.idx["http://arvados.org/cwl#"+n] = {}
641 def main(args, stdout, stderr, api_client=None, keep_client=None):
642 parser = arg_parser()
644 job_order_object = None
645 arvargs = parser.parse_args(args)
648 print versionstring()
651 if arvargs.update_workflow:
652 if arvargs.update_workflow.find('-7fd4e-') == 5:
653 want_api = 'containers'
654 elif arvargs.update_workflow.find('-p5p6p-') == 5:
658 if want_api and arvargs.work_api and want_api != arvargs.work_api:
659 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
660 arvargs.update_workflow, want_api, arvargs.work_api))
662 arvargs.work_api = want_api
664 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
665 job_order_object = ({}, "")
670 if api_client is None:
671 api_client=arvados.api('v1', model=OrderedJsonModel())
672 if keep_client is None:
673 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
674 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
675 num_retries=4, output_name=arvargs.output_name,
676 output_tags=arvargs.output_tags)
677 except Exception as e:
682 logger.setLevel(logging.DEBUG)
683 logging.getLogger('arvados').setLevel(logging.DEBUG)
686 logger.setLevel(logging.WARN)
687 logging.getLogger('arvados').setLevel(logging.WARN)
688 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
691 metrics.setLevel(logging.DEBUG)
692 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
694 if arvargs.log_timestamps:
695 arvados.log_handler.setFormatter(logging.Formatter(
696 '%(asctime)s %(name)s %(levelname)s: %(message)s',
697 '%Y-%m-%d %H:%M:%S'))
699 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
701 arvargs.conformance_test = None
702 arvargs.use_container = True
703 arvargs.relax_path_checks = True
704 arvargs.validate = None
706 return cwltool.main.main(args=arvargs,
709 executor=runner.arv_executor,
710 makeTool=runner.arv_make_tool,
711 versionfunc=versionstring,
712 job_order_object=job_order_object,
713 make_fs_access=partial(CollectionFsAccess,
714 api_client=api_client,
715 keep_client=keep_client),
716 fetcher_constructor=partial(CollectionFetcher,
717 api_client=api_client,
718 keep_client=keep_client,
719 num_retries=runner.num_retries),
720 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
721 logger_handler=arvados.log_handler)