3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86 expected_api = ["jobs", "containers"]
87 for api in expected_api:
89 methods = self.api._rootDesc.get('resources')[api]['methods']
90 if ('httpMethod' in methods['create'] and
91 (work_api == api or work_api is None)):
99 raise Exception("No supported APIs")
101 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
103 def arv_make_tool(self, toolpath_object, **kwargs):
104 kwargs["work_api"] = self.work_api
105 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
107 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108 num_retries=self.num_retries)
109 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
110 return ArvadosCommandTool(self, toolpath_object, **kwargs)
111 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
112 return ArvadosWorkflow(self, toolpath_object, **kwargs)
114 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
116 def output_callback(self, out, processStatus):
117 if processStatus == "success":
118 logger.info("Overall process status is %s", processStatus)
120 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
121 body={"state": "Complete"}).execute(num_retries=self.num_retries)
123 logger.warn("Overall process status is %s", processStatus)
125 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
126 body={"state": "Failed"}).execute(num_retries=self.num_retries)
127 self.final_status = processStatus
128 self.final_output = out
130 def on_message(self, event):
131 if "object_uuid" in event:
132 if event["object_uuid"] in self.processes and event["event_type"] == "update":
133 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
134 uuid = event["object_uuid"]
136 j = self.processes[uuid]
137 logger.info("%s %s is Running", self.label(j), uuid)
139 j.update_pipeline_component(event["properties"]["new_attributes"])
140 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
141 uuid = event["object_uuid"]
144 j = self.processes[uuid]
145 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
146 with Perf(metrics, "done %s" % j.name):
147 j.done(event["properties"]["new_attributes"])
152 def label(self, obj):
153 return "[%s %s]" % (self.work_api[0:-1], obj.name)
155 def poll_states(self):
156 """Poll status of jobs or containers listed in the processes dict.
158 Runs in a separate thread.
163 self.stop_polling.wait(15)
164 if self.stop_polling.is_set():
167 keys = self.processes.keys()
171 if self.work_api == "containers":
172 table = self.poll_api.container_requests()
173 elif self.work_api == "jobs":
174 table = self.poll_api.jobs()
177 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
178 except Exception as e:
179 logger.warn("Error checking states on API server: %s", e)
182 for p in proc_states["items"]:
184 "object_uuid": p["uuid"],
185 "event_type": "update",
191 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
193 self.processes.clear()
197 self.stop_polling.set()
199 def get_uploaded(self):
200 return self.uploaded.copy()
202 def add_uploaded(self, src, pair):
203 self.uploaded[src] = pair
205 def check_features(self, obj):
206 if isinstance(obj, dict):
207 if obj.get("writable"):
208 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
209 if obj.get("class") == "DockerRequirement":
210 if obj.get("dockerOutputDirectory"):
211 # TODO: can be supported by containers API, but not jobs API.
212 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
213 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
214 for v in obj.itervalues():
215 self.check_features(v)
216 elif isinstance(obj, list):
217 for i,v in enumerate(obj):
218 with SourceLine(obj, i, UnsupportedRequirement):
219 self.check_features(v)
221 def make_output_collection(self, name, tagsString, outputObj):
222 outputObj = copy.deepcopy(outputObj)
225 def capture(fileobj):
226 files.append(fileobj)
228 adjustDirObjs(outputObj, capture)
229 adjustFileObjs(outputObj, capture)
231 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
233 final = arvados.collection.Collection(api_client=self.api,
234 keep_client=self.keep_client,
235 num_retries=self.num_retries)
237 for k,v in generatemapper.items():
238 if k.startswith("_:"):
239 if v.type == "Directory":
241 if v.type == "CreateFile":
242 with final.open(v.target, "wb") as f:
243 f.write(v.resolved.encode("utf-8"))
246 if not k.startswith("keep:"):
247 raise Exception("Output source is not in keep or a literal")
249 srccollection = sp[0][5:]
251 reader = self.collection_cache.get(srccollection)
252 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
253 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
254 except arvados.errors.ArgumentError as e:
255 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
258 logger.warn("While preparing output collection: %s", e)
260 def rewrite(fileobj):
261 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
262 for k in ("basename", "listing", "contents"):
266 adjustDirObjs(outputObj, rewrite)
267 adjustFileObjs(outputObj, rewrite)
269 with final.open("cwl.output.json", "w") as f:
270 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
272 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
274 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
275 final.api_response()["name"],
276 final.manifest_locator())
278 final_uuid = final.manifest_locator()
279 tags = tagsString.split(',')
281 self.api.links().create(body={
282 "head_uuid": final_uuid, "link_class": "tag", "name": tag
283 }).execute(num_retries=self.num_retries)
285 def finalcollection(fileobj):
286 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
288 adjustDirObjs(outputObj, finalcollection)
289 adjustFileObjs(outputObj, finalcollection)
291 return (outputObj, final)
293 def set_crunch_output(self):
294 if self.work_api == "containers":
296 current = self.api.containers().current().execute(num_retries=self.num_retries)
297 except ApiError as e:
298 # Status code 404 just means we're not running in a container.
299 if e.resp.status != 404:
300 logger.info("Getting current container: %s", e)
303 self.api.containers().update(uuid=current['uuid'],
305 'output': self.final_output_collection.portable_data_hash(),
306 }).execute(num_retries=self.num_retries)
307 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
310 }).execute(num_retries=self.num_retries)
311 except Exception as e:
312 logger.info("Setting container output: %s", e)
313 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
314 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
316 'output': self.final_output_collection.portable_data_hash(),
317 'success': self.final_status == "success",
319 }).execute(num_retries=self.num_retries)
321 def arv_executor(self, tool, job_order, **kwargs):
322 self.debug = kwargs.get("debug")
324 tool.visit(self.check_features)
326 self.project_uuid = kwargs.get("project_uuid")
328 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
329 collection_cache=self.collection_cache)
330 self.fs_access = make_fs_access(kwargs["basedir"])
332 if not kwargs.get("name"):
333 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
335 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
336 # Also uploads docker images.
337 upload_workflow_deps(self, tool)
339 # Reload tool object which may have been updated by
340 # upload_workflow_deps
341 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
342 makeTool=self.arv_make_tool,
343 loader=tool.doc_loader,
344 avsc_names=tool.doc_schema,
345 metadata=tool.metadata)
347 # Upload local file references in the job order.
348 job_order = upload_job_order(self, "%s input" % kwargs["name"],
351 existing_uuid = kwargs.get("update_workflow")
352 if existing_uuid or kwargs.get("create_workflow"):
353 # Create a pipeline template or workflow record and exit.
354 if self.work_api == "jobs":
355 tmpl = RunnerTemplate(self, tool, job_order,
356 kwargs.get("enable_reuse"),
358 submit_runner_ram=kwargs.get("submit_runner_ram"),
361 # cwltool.main will write our return value to stdout.
362 return (tmpl.uuid, "success")
363 elif self.work_api == "containers":
364 return (upload_workflow(self, tool, job_order,
367 submit_runner_ram=kwargs.get("submit_runner_ram"),
368 name=kwargs["name"]),
371 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
373 kwargs["make_fs_access"] = make_fs_access
374 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
375 kwargs["use_container"] = True
376 kwargs["tmpdir_prefix"] = "tmp"
377 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
379 if self.work_api == "containers":
380 kwargs["outdir"] = "/var/spool/cwl"
381 kwargs["docker_outdir"] = "/var/spool/cwl"
382 kwargs["tmpdir"] = "/tmp"
383 kwargs["docker_tmpdir"] = "/tmp"
384 elif self.work_api == "jobs":
385 kwargs["outdir"] = "$(task.outdir)"
386 kwargs["docker_outdir"] = "$(task.outdir)"
387 kwargs["tmpdir"] = "$(task.tmpdir)"
390 if kwargs.get("submit"):
391 # Submit a runner job to run the workflow for us.
392 if self.work_api == "containers":
393 if tool.tool["class"] == "CommandLineTool":
394 kwargs["runnerjob"] = tool.tool["id"]
395 upload_dependencies(self,
401 runnerjob = tool.job(job_order,
402 self.output_callback,
405 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
408 submit_runner_ram=kwargs.get("submit_runner_ram"),
409 name=kwargs.get("name"),
410 on_error=kwargs.get("on_error"),
411 submit_runner_image=kwargs.get("submit_runner_image"))
412 elif self.work_api == "jobs":
413 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
416 submit_runner_ram=kwargs.get("submit_runner_ram"),
417 name=kwargs.get("name"),
418 on_error=kwargs.get("on_error"),
419 submit_runner_image=kwargs.get("submit_runner_image"))
421 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
422 # Create pipeline for local run
423 self.pipeline = self.api.pipeline_instances().create(
425 "owner_uuid": self.project_uuid,
426 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
428 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
429 logger.info("Pipeline instance %s", self.pipeline["uuid"])
431 if runnerjob and not kwargs.get("wait"):
432 runnerjob.run(wait=kwargs.get("wait"))
433 return (runnerjob.uuid, "success")
435 self.poll_api = arvados.api('v1')
436 self.polling_thread = threading.Thread(target=self.poll_states)
437 self.polling_thread.start()
440 jobiter = iter((runnerjob,))
442 if "cwl_runner_job" in kwargs:
443 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
444 jobiter = tool.job(job_order,
445 self.output_callback,
450 # Will continue to hold the lock for the duration of this code
451 # except when in cond.wait(), at which point on_message can update
452 # job state and process output callbacks.
454 loopperf = Perf(metrics, "jobiter")
456 for runnable in jobiter:
459 if self.stop_polling.is_set():
463 with Perf(metrics, "run"):
464 runnable.run(**kwargs)
469 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
474 while self.processes:
477 except UnsupportedRequirement:
480 if sys.exc_info()[0] is KeyboardInterrupt:
481 logger.error("Interrupted, marking pipeline as failed")
483 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
485 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
486 body={"state": "Failed"}).execute(num_retries=self.num_retries)
487 if runnerjob and runnerjob.uuid and self.work_api == "containers":
488 self.api.container_requests().update(uuid=runnerjob.uuid,
489 body={"priority": "0"}).execute(num_retries=self.num_retries)
492 self.stop_polling.set()
493 self.polling_thread.join()
495 if self.final_status == "UnsupportedRequirement":
496 raise UnsupportedRequirement("Check log for details.")
498 if self.final_output is None:
499 raise WorkflowException("Workflow did not return a result.")
501 if kwargs.get("submit") and isinstance(runnerjob, Runner):
502 logger.info("Final output collection %s", runnerjob.final_output)
504 if self.output_name is None:
505 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
506 if self.output_tags is None:
507 self.output_tags = ""
508 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
509 self.set_crunch_output()
511 if kwargs.get("compute_checksum"):
512 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
513 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
515 return (self.final_output, self.final_status)
519 """Print version string of key packages for provenance and debugging."""
521 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
522 arvpkg = pkg_resources.require("arvados-python-client")
523 cwlpkg = pkg_resources.require("cwltool")
525 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
526 "arvados-python-client", arvpkg[0].version,
527 "cwltool", cwlpkg[0].version)
530 def arg_parser(): # type: () -> argparse.ArgumentParser
531 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
533 parser.add_argument("--basedir", type=str,
534 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
535 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
536 help="Output directory, default current directory")
538 parser.add_argument("--eval-timeout",
539 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
543 exgroup = parser.add_mutually_exclusive_group()
544 exgroup.add_argument("--print-dot", action="store_true",
545 help="Print workflow visualization in graphviz format and exit")
546 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
547 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
549 exgroup = parser.add_mutually_exclusive_group()
550 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
551 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
552 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
554 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
556 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
558 exgroup = parser.add_mutually_exclusive_group()
559 exgroup.add_argument("--enable-reuse", action="store_true",
560 default=True, dest="enable_reuse",
562 exgroup.add_argument("--disable-reuse", action="store_false",
563 default=True, dest="enable_reuse",
566 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
567 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
568 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
569 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
570 help="Ignore Docker image version when deciding whether to reuse past jobs.",
573 exgroup = parser.add_mutually_exclusive_group()
574 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
575 default=True, dest="submit")
576 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
577 default=True, dest="submit")
578 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
579 dest="create_workflow")
580 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
581 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
583 exgroup = parser.add_mutually_exclusive_group()
584 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
585 default=True, dest="wait")
586 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
587 default=True, dest="wait")
589 exgroup = parser.add_mutually_exclusive_group()
590 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
591 default=True, dest="log_timestamps")
592 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
593 default=True, dest="log_timestamps")
595 parser.add_argument("--api", type=str,
596 default=None, dest="work_api",
597 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
599 parser.add_argument("--compute-checksum", action="store_true", default=False,
600 help="Compute checksum of contents while collecting outputs",
601 dest="compute_checksum")
603 parser.add_argument("--submit-runner-ram", type=int,
604 help="RAM (in MiB) required for the workflow runner job (default 1024)",
607 parser.add_argument("--submit-runner-image", type=str,
608 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
611 parser.add_argument("--name", type=str,
612 help="Name to use for workflow execution instance.",
615 parser.add_argument("--on-error", type=str,
616 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
617 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
619 parser.add_argument("--enable-dev", action="store_true",
620 help="Enable loading and running development versions "
621 "of CWL spec.", default=False)
623 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
624 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
629 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
630 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
631 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
632 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
634 cwltool.process.supportedProcessRequirements.extend([
635 "http://arvados.org/cwl#RunInSingleContainer",
636 "http://arvados.org/cwl#OutputDirType",
637 "http://arvados.org/cwl#RuntimeConstraints",
638 "http://arvados.org/cwl#PartitionRequirement",
639 "http://arvados.org/cwl#APIRequirement",
640 "http://commonwl.org/cwltool#LoadListingRequirement"
643 def main(args, stdout, stderr, api_client=None, keep_client=None):
644 parser = arg_parser()
646 job_order_object = None
647 arvargs = parser.parse_args(args)
650 print versionstring()
653 if arvargs.update_workflow:
654 if arvargs.update_workflow.find('-7fd4e-') == 5:
655 want_api = 'containers'
656 elif arvargs.update_workflow.find('-p5p6p-') == 5:
660 if want_api and arvargs.work_api and want_api != arvargs.work_api:
661 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
662 arvargs.update_workflow, want_api, arvargs.work_api))
664 arvargs.work_api = want_api
666 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
667 job_order_object = ({}, "")
672 if api_client is None:
673 api_client=arvados.api('v1', model=OrderedJsonModel())
674 if keep_client is None:
675 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
676 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
677 num_retries=4, output_name=arvargs.output_name,
678 output_tags=arvargs.output_tags)
679 except Exception as e:
684 logger.setLevel(logging.DEBUG)
685 logging.getLogger('arvados').setLevel(logging.DEBUG)
688 logger.setLevel(logging.WARN)
689 logging.getLogger('arvados').setLevel(logging.WARN)
690 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
693 metrics.setLevel(logging.DEBUG)
694 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
696 if arvargs.log_timestamps:
697 arvados.log_handler.setFormatter(logging.Formatter(
698 '%(asctime)s %(name)s %(levelname)s: %(message)s',
699 '%Y-%m-%d %H:%M:%S'))
701 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
703 arvargs.conformance_test = None
704 arvargs.use_container = True
705 arvargs.relax_path_checks = True
706 arvargs.validate = None
708 make_fs_access = partial(CollectionFsAccess,
709 collection_cache=runner.collection_cache)
711 return cwltool.main.main(args=arvargs,
714 executor=runner.arv_executor,
715 makeTool=runner.arv_make_tool,
716 versionfunc=versionstring,
717 job_order_object=job_order_object,
718 make_fs_access=make_fs_access,
719 fetcher_constructor=partial(CollectionFetcher,
720 api_client=api_client,
721 fs_access=make_fs_access(""),
722 num_retries=runner.num_retries),
723 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
724 logger_handler=arvados.log_handler,
725 custom_schema_callback=add_arv_hints)