3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, getListing
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
84 expected_api = ["jobs", "containers"]
85 for api in expected_api:
87 methods = self.api._rootDesc.get('resources')[api]['methods']
88 if ('httpMethod' in methods['create'] and
89 (work_api == api or work_api is None)):
97 raise Exception("No supported APIs")
99 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
101 def arv_make_tool(self, toolpath_object, **kwargs):
102 kwargs["work_api"] = self.work_api
103 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
105 keep_client=self.keep_client)
106 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
107 return ArvadosCommandTool(self, toolpath_object, **kwargs)
108 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
109 return ArvadosWorkflow(self, toolpath_object, **kwargs)
111 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
113 def output_callback(self, out, processStatus):
114 if processStatus == "success":
115 logger.info("Overall process status is %s", processStatus)
117 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118 body={"state": "Complete"}).execute(num_retries=self.num_retries)
120 logger.warn("Overall process status is %s", processStatus)
122 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123 body={"state": "Failed"}).execute(num_retries=self.num_retries)
124 self.final_status = processStatus
125 self.final_output = out
127 def on_message(self, event):
128 if "object_uuid" in event:
129 if event["object_uuid"] in self.processes and event["event_type"] == "update":
130 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
131 uuid = event["object_uuid"]
133 j = self.processes[uuid]
134 logger.info("%s %s is Running", self.label(j), uuid)
136 j.update_pipeline_component(event["properties"]["new_attributes"])
137 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
138 uuid = event["object_uuid"]
141 j = self.processes[uuid]
142 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
143 with Perf(metrics, "done %s" % j.name):
144 j.done(event["properties"]["new_attributes"])
149 def label(self, obj):
150 return "[%s %s]" % (self.work_api[0:-1], obj.name)
152 def poll_states(self):
153 """Poll status of jobs or containers listed in the processes dict.
155 Runs in a separate thread.
160 self.stop_polling.wait(15)
161 if self.stop_polling.is_set():
164 keys = self.processes.keys()
168 if self.work_api == "containers":
169 table = self.poll_api.container_requests()
170 elif self.work_api == "jobs":
171 table = self.poll_api.jobs()
174 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
175 except Exception as e:
176 logger.warn("Error checking states on API server: %s", e)
179 for p in proc_states["items"]:
181 "object_uuid": p["uuid"],
182 "event_type": "update",
188 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
190 self.processes.clear()
194 self.stop_polling.set()
196 def get_uploaded(self):
197 return self.uploaded.copy()
199 def add_uploaded(self, src, pair):
200 self.uploaded[src] = pair
202 def check_features(self, obj):
203 if isinstance(obj, dict):
204 if obj.get("writable"):
205 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
206 if obj.get("class") == "CommandLineTool":
207 if self.work_api == "containers":
209 raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
210 if obj.get("stderr"):
211 raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
212 if obj.get("class") == "DockerRequirement":
213 if obj.get("dockerOutputDirectory"):
214 # TODO: can be supported by containers API, but not jobs API.
215 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
216 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
217 for v in obj.itervalues():
218 self.check_features(v)
219 elif isinstance(obj, list):
220 for i,v in enumerate(obj):
221 with SourceLine(obj, i, UnsupportedRequirement):
222 self.check_features(v)
224 def make_output_collection(self, name, tagsString, outputObj):
225 outputObj = copy.deepcopy(outputObj)
228 def capture(fileobj):
229 files.append(fileobj)
231 adjustDirObjs(outputObj, capture)
232 adjustFileObjs(outputObj, capture)
234 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
236 final = arvados.collection.Collection(api_client=self.api,
237 keep_client=self.keep_client,
238 num_retries=self.num_retries)
241 for k,v in generatemapper.items():
242 if k.startswith("_:"):
243 if v.type == "Directory":
245 if v.type == "CreateFile":
246 with final.open(v.target, "wb") as f:
247 f.write(v.resolved.encode("utf-8"))
250 if not k.startswith("keep:"):
251 raise Exception("Output source is not in keep or a literal")
253 srccollection = sp[0][5:]
254 if srccollection not in srccollections:
256 srccollections[srccollection] = arvados.collection.CollectionReader(
259 keep_client=self.keep_client,
260 num_retries=self.num_retries)
261 except arvados.errors.ArgumentError as e:
262 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
264 reader = srccollections[srccollection]
266 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
267 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
269 logger.warn("While preparing output collection: %s", e)
271 def rewrite(fileobj):
272 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
273 for k in ("basename", "listing", "contents"):
277 adjustDirObjs(outputObj, rewrite)
278 adjustFileObjs(outputObj, rewrite)
280 with final.open("cwl.output.json", "w") as f:
281 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
283 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
285 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
286 final.api_response()["name"],
287 final.manifest_locator())
289 final_uuid = final.manifest_locator()
290 tags = tagsString.split(',')
292 self.api.links().create(body={
293 "head_uuid": final_uuid, "link_class": "tag", "name": tag
294 }).execute(num_retries=self.num_retries)
296 def finalcollection(fileobj):
297 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
299 adjustDirObjs(outputObj, finalcollection)
300 adjustFileObjs(outputObj, finalcollection)
302 return (outputObj, final)
304 def set_crunch_output(self):
305 if self.work_api == "containers":
307 current = self.api.containers().current().execute(num_retries=self.num_retries)
308 except ApiError as e:
309 # Status code 404 just means we're not running in a container.
310 if e.resp.status != 404:
311 logger.info("Getting current container: %s", e)
314 self.api.containers().update(uuid=current['uuid'],
316 'output': self.final_output_collection.portable_data_hash(),
317 }).execute(num_retries=self.num_retries)
318 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
321 }).execute(num_retries=self.num_retries)
322 except Exception as e:
323 logger.info("Setting container output: %s", e)
324 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
325 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
327 'output': self.final_output_collection.portable_data_hash(),
328 'success': self.final_status == "success",
330 }).execute(num_retries=self.num_retries)
332 def arv_executor(self, tool, job_order, **kwargs):
333 self.debug = kwargs.get("debug")
335 tool.visit(self.check_features)
337 self.project_uuid = kwargs.get("project_uuid")
339 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
341 keep_client=self.keep_client)
342 self.fs_access = make_fs_access(kwargs["basedir"])
344 if not kwargs.get("name"):
345 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
347 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
348 # Also uploads docker images.
349 upload_workflow_deps(self, tool)
351 # Reload tool object which may have been updated by
352 # upload_workflow_deps
353 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
354 makeTool=self.arv_make_tool,
355 loader=tool.doc_loader,
356 avsc_names=tool.doc_schema,
357 metadata=tool.metadata)
359 # Upload local file references in the job order.
360 job_order = upload_job_order(self, "%s input" % kwargs["name"],
363 existing_uuid = kwargs.get("update_workflow")
364 if existing_uuid or kwargs.get("create_workflow"):
365 # Create a pipeline template or workflow record and exit.
366 if self.work_api == "jobs":
367 tmpl = RunnerTemplate(self, tool, job_order,
368 kwargs.get("enable_reuse"),
370 submit_runner_ram=kwargs.get("submit_runner_ram"),
373 # cwltool.main will write our return value to stdout.
374 return (tmpl.uuid, "success")
375 elif self.work_api == "containers":
376 return (upload_workflow(self, tool, job_order,
379 submit_runner_ram=kwargs.get("submit_runner_ram"),
380 name=kwargs["name"]),
383 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
385 kwargs["make_fs_access"] = make_fs_access
386 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
387 kwargs["use_container"] = True
388 kwargs["tmpdir_prefix"] = "tmp"
389 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
391 if self.work_api == "containers":
392 kwargs["outdir"] = "/var/spool/cwl"
393 kwargs["docker_outdir"] = "/var/spool/cwl"
394 kwargs["tmpdir"] = "/tmp"
395 kwargs["docker_tmpdir"] = "/tmp"
396 elif self.work_api == "jobs":
397 kwargs["outdir"] = "$(task.outdir)"
398 kwargs["docker_outdir"] = "$(task.outdir)"
399 kwargs["tmpdir"] = "$(task.tmpdir)"
402 if kwargs.get("submit"):
403 # Submit a runner job to run the workflow for us.
404 if self.work_api == "containers":
405 if tool.tool["class"] == "CommandLineTool":
406 kwargs["runnerjob"] = tool.tool["id"]
407 upload_dependencies(self,
413 runnerjob = tool.job(job_order,
414 self.output_callback,
417 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
420 submit_runner_ram=kwargs.get("submit_runner_ram"),
421 name=kwargs.get("name"),
422 on_error=kwargs.get("on_error"),
423 submit_runner_image=kwargs.get("submit_runner_image"))
424 elif self.work_api == "jobs":
425 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
428 submit_runner_ram=kwargs.get("submit_runner_ram"),
429 name=kwargs.get("name"),
430 on_error=kwargs.get("on_error"),
431 submit_runner_image=kwargs.get("submit_runner_image"))
433 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
434 # Create pipeline for local run
435 self.pipeline = self.api.pipeline_instances().create(
437 "owner_uuid": self.project_uuid,
438 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
440 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
441 logger.info("Pipeline instance %s", self.pipeline["uuid"])
443 if runnerjob and not kwargs.get("wait"):
444 runnerjob.run(wait=kwargs.get("wait"))
445 return (runnerjob.uuid, "success")
447 self.poll_api = arvados.api('v1')
448 self.polling_thread = threading.Thread(target=self.poll_states)
449 self.polling_thread.start()
452 jobiter = iter((runnerjob,))
454 if "cwl_runner_job" in kwargs:
455 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
456 jobiter = tool.job(job_order,
457 self.output_callback,
462 # Will continue to hold the lock for the duration of this code
463 # except when in cond.wait(), at which point on_message can update
464 # job state and process output callbacks.
466 loopperf = Perf(metrics, "jobiter")
468 for runnable in jobiter:
471 if self.stop_polling.is_set():
475 with Perf(metrics, "run"):
476 runnable.run(**kwargs)
481 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
486 while self.processes:
489 except UnsupportedRequirement:
492 if sys.exc_info()[0] is KeyboardInterrupt:
493 logger.error("Interrupted, marking pipeline as failed")
495 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
497 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
498 body={"state": "Failed"}).execute(num_retries=self.num_retries)
499 if runnerjob and runnerjob.uuid and self.work_api == "containers":
500 self.api.container_requests().update(uuid=runnerjob.uuid,
501 body={"priority": "0"}).execute(num_retries=self.num_retries)
504 self.stop_polling.set()
505 self.polling_thread.join()
507 if self.final_status == "UnsupportedRequirement":
508 raise UnsupportedRequirement("Check log for details.")
510 if self.final_output is None:
511 raise WorkflowException("Workflow did not return a result.")
513 if kwargs.get("submit") and isinstance(runnerjob, Runner):
514 logger.info("Final output collection %s", runnerjob.final_output)
516 if self.output_name is None:
517 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
518 if self.output_tags is None:
519 self.output_tags = ""
520 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
521 self.set_crunch_output()
523 if kwargs.get("compute_checksum"):
524 adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
525 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
527 return (self.final_output, self.final_status)
531 """Print version string of key packages for provenance and debugging."""
533 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
534 arvpkg = pkg_resources.require("arvados-python-client")
535 cwlpkg = pkg_resources.require("cwltool")
537 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
538 "arvados-python-client", arvpkg[0].version,
539 "cwltool", cwlpkg[0].version)
542 def arg_parser(): # type: () -> argparse.ArgumentParser
543 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
545 parser.add_argument("--basedir", type=str,
546 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
547 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
548 help="Output directory, default current directory")
550 parser.add_argument("--eval-timeout",
551 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
554 parser.add_argument("--version", action="store_true", help="Print version and exit")
556 exgroup = parser.add_mutually_exclusive_group()
557 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
558 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
559 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
561 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
563 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
565 exgroup = parser.add_mutually_exclusive_group()
566 exgroup.add_argument("--enable-reuse", action="store_true",
567 default=True, dest="enable_reuse",
569 exgroup.add_argument("--disable-reuse", action="store_false",
570 default=True, dest="enable_reuse",
573 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
574 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
575 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
576 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
577 help="Ignore Docker image version when deciding whether to reuse past jobs.",
580 exgroup = parser.add_mutually_exclusive_group()
581 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
582 default=True, dest="submit")
583 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
584 default=True, dest="submit")
585 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
586 dest="create_workflow")
587 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
588 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
590 exgroup = parser.add_mutually_exclusive_group()
591 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
592 default=True, dest="wait")
593 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
594 default=True, dest="wait")
596 exgroup = parser.add_mutually_exclusive_group()
597 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
598 default=True, dest="log_timestamps")
599 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
600 default=True, dest="log_timestamps")
602 parser.add_argument("--api", type=str,
603 default=None, dest="work_api",
604 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
606 parser.add_argument("--compute-checksum", action="store_true", default=False,
607 help="Compute checksum of contents while collecting outputs",
608 dest="compute_checksum")
610 parser.add_argument("--submit-runner-ram", type=int,
611 help="RAM (in MiB) required for the workflow runner job (default 1024)",
614 parser.add_argument("--submit-runner-image", type=str,
615 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
618 parser.add_argument("--name", type=str,
619 help="Name to use for workflow execution instance.",
622 parser.add_argument("--on-error", type=str,
623 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
624 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
626 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
627 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
633 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
634 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
635 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
636 cache["http://arvados.org/cwl"] = res.read()
638 document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
639 _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
640 for n in extnames.names:
641 if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
642 cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
643 document_loader.idx["http://arvados.org/cwl#"+n] = {}
645 def main(args, stdout, stderr, api_client=None, keep_client=None):
646 parser = arg_parser()
648 job_order_object = None
649 arvargs = parser.parse_args(args)
652 print versionstring()
655 if arvargs.update_workflow:
656 if arvargs.update_workflow.find('-7fd4e-') == 5:
657 want_api = 'containers'
658 elif arvargs.update_workflow.find('-p5p6p-') == 5:
662 if want_api and arvargs.work_api and want_api != arvargs.work_api:
663 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
664 arvargs.update_workflow, want_api, arvargs.work_api))
666 arvargs.work_api = want_api
668 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
669 job_order_object = ({}, "")
674 if api_client is None:
675 api_client=arvados.api('v1', model=OrderedJsonModel())
676 if keep_client is None:
677 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
678 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
679 num_retries=4, output_name=arvargs.output_name,
680 output_tags=arvargs.output_tags)
681 except Exception as e:
686 logger.setLevel(logging.DEBUG)
687 logging.getLogger('arvados').setLevel(logging.DEBUG)
690 logger.setLevel(logging.WARN)
691 logging.getLogger('arvados').setLevel(logging.WARN)
692 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
695 metrics.setLevel(logging.DEBUG)
696 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
698 if arvargs.log_timestamps:
699 arvados.log_handler.setFormatter(logging.Formatter(
700 '%(asctime)s %(name)s %(levelname)s: %(message)s',
701 '%Y-%m-%d %H:%M:%S'))
703 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
705 arvargs.conformance_test = None
706 arvargs.use_container = True
707 arvargs.relax_path_checks = True
708 arvargs.validate = None
710 return cwltool.main.main(args=arvargs,
713 executor=runner.arv_executor,
714 makeTool=runner.arv_make_tool,
715 versionfunc=versionstring,
716 job_order_object=job_order_object,
717 make_fs_access=partial(CollectionFsAccess,
718 api_client=api_client,
719 keep_client=keep_client),
720 fetcher_constructor=partial(CollectionFetcher,
721 api_client=api_client,
722 keep_client=keep_client,
723 num_retries=runner.num_retries),
724 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
725 logger_handler=arvados.log_handler)