3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 if keep_client is not None:
79 self.keep_client = keep_client
81 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
83 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86 expected_api = ["jobs", "containers"]
87 for api in expected_api:
89 methods = self.api._rootDesc.get('resources')[api]['methods']
90 if ('httpMethod' in methods['create'] and
91 (work_api == api or work_api is None)):
99 raise Exception("No supported APIs")
101 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
103 def arv_make_tool(self, toolpath_object, **kwargs):
104 kwargs["work_api"] = self.work_api
105 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
107 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
108 num_retries=self.num_retries,
109 overrides=kwargs.get("override_tools"))
110 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
111 return ArvadosCommandTool(self, toolpath_object, **kwargs)
112 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
113 return ArvadosWorkflow(self, toolpath_object, **kwargs)
115 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
117 def output_callback(self, out, processStatus):
118 if processStatus == "success":
119 logger.info("Overall process status is %s", processStatus)
121 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122 body={"state": "Complete"}).execute(num_retries=self.num_retries)
124 logger.warn("Overall process status is %s", processStatus)
126 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
127 body={"state": "Failed"}).execute(num_retries=self.num_retries)
128 self.final_status = processStatus
129 self.final_output = out
131 def on_message(self, event):
132 if "object_uuid" in event:
133 if event["object_uuid"] in self.processes and event["event_type"] == "update":
134 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
135 uuid = event["object_uuid"]
137 j = self.processes[uuid]
138 logger.info("%s %s is Running", self.label(j), uuid)
140 j.update_pipeline_component(event["properties"]["new_attributes"])
141 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
142 uuid = event["object_uuid"]
145 j = self.processes[uuid]
146 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
147 with Perf(metrics, "done %s" % j.name):
148 j.done(event["properties"]["new_attributes"])
153 def label(self, obj):
154 return "[%s %s]" % (self.work_api[0:-1], obj.name)
156 def poll_states(self):
157 """Poll status of jobs or containers listed in the processes dict.
159 Runs in a separate thread.
164 self.stop_polling.wait(15)
165 if self.stop_polling.is_set():
168 keys = self.processes.keys()
172 if self.work_api == "containers":
173 table = self.poll_api.container_requests()
174 elif self.work_api == "jobs":
175 table = self.poll_api.jobs()
178 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
179 except Exception as e:
180 logger.warn("Error checking states on API server: %s", e)
183 for p in proc_states["items"]:
185 "object_uuid": p["uuid"],
186 "event_type": "update",
192 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
194 self.processes.clear()
198 self.stop_polling.set()
200 def get_uploaded(self):
201 return self.uploaded.copy()
203 def add_uploaded(self, src, pair):
204 self.uploaded[src] = pair
206 def check_features(self, obj):
207 if isinstance(obj, dict):
208 if obj.get("writable"):
209 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
210 if obj.get("class") == "DockerRequirement":
211 if obj.get("dockerOutputDirectory"):
212 # TODO: can be supported by containers API, but not jobs API.
213 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
214 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
215 for v in obj.itervalues():
216 self.check_features(v)
217 elif isinstance(obj, list):
218 for i,v in enumerate(obj):
219 with SourceLine(obj, i, UnsupportedRequirement):
220 self.check_features(v)
222 def make_output_collection(self, name, tagsString, outputObj):
223 outputObj = copy.deepcopy(outputObj)
226 def capture(fileobj):
227 files.append(fileobj)
229 adjustDirObjs(outputObj, capture)
230 adjustFileObjs(outputObj, capture)
232 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
234 final = arvados.collection.Collection(api_client=self.api,
235 keep_client=self.keep_client,
236 num_retries=self.num_retries)
238 for k,v in generatemapper.items():
239 if k.startswith("_:"):
240 if v.type == "Directory":
242 if v.type == "CreateFile":
243 with final.open(v.target, "wb") as f:
244 f.write(v.resolved.encode("utf-8"))
247 if not k.startswith("keep:"):
248 raise Exception("Output source is not in keep or a literal")
250 srccollection = sp[0][5:]
252 reader = self.collection_cache.get(srccollection)
253 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
254 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
255 except arvados.errors.ArgumentError as e:
256 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
259 logger.warn("While preparing output collection: %s", e)
261 def rewrite(fileobj):
262 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
263 for k in ("basename", "listing", "contents"):
267 adjustDirObjs(outputObj, rewrite)
268 adjustFileObjs(outputObj, rewrite)
270 with final.open("cwl.output.json", "w") as f:
271 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
273 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
275 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
276 final.api_response()["name"],
277 final.manifest_locator())
279 final_uuid = final.manifest_locator()
280 tags = tagsString.split(',')
282 self.api.links().create(body={
283 "head_uuid": final_uuid, "link_class": "tag", "name": tag
284 }).execute(num_retries=self.num_retries)
286 def finalcollection(fileobj):
287 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
289 adjustDirObjs(outputObj, finalcollection)
290 adjustFileObjs(outputObj, finalcollection)
292 return (outputObj, final)
294 def set_crunch_output(self):
295 if self.work_api == "containers":
297 current = self.api.containers().current().execute(num_retries=self.num_retries)
298 except ApiError as e:
299 # Status code 404 just means we're not running in a container.
300 if e.resp.status != 404:
301 logger.info("Getting current container: %s", e)
304 self.api.containers().update(uuid=current['uuid'],
306 'output': self.final_output_collection.portable_data_hash(),
307 }).execute(num_retries=self.num_retries)
308 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
311 }).execute(num_retries=self.num_retries)
312 except Exception as e:
313 logger.info("Setting container output: %s", e)
314 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
315 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
317 'output': self.final_output_collection.portable_data_hash(),
318 'success': self.final_status == "success",
320 }).execute(num_retries=self.num_retries)
322 def arv_executor(self, tool, job_order, **kwargs):
323 self.debug = kwargs.get("debug")
325 tool.visit(self.check_features)
327 self.project_uuid = kwargs.get("project_uuid")
329 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
330 collection_cache=self.collection_cache)
331 self.fs_access = make_fs_access(kwargs["basedir"])
333 if not kwargs.get("name"):
334 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
336 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
337 # Also uploads docker images.
339 upload_workflow_deps(self, tool, override_tools)
341 # Reload tool object which may have been updated by
342 # upload_workflow_deps
343 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
344 makeTool=self.arv_make_tool,
345 loader=tool.doc_loader,
346 avsc_names=tool.doc_schema,
347 metadata=tool.metadata,
348 override_tools=override_tools)
350 # Upload local file references in the job order.
351 job_order = upload_job_order(self, "%s input" % kwargs["name"],
354 existing_uuid = kwargs.get("update_workflow")
355 if existing_uuid or kwargs.get("create_workflow"):
356 # Create a pipeline template or workflow record and exit.
357 if self.work_api == "jobs":
358 tmpl = RunnerTemplate(self, tool, job_order,
359 kwargs.get("enable_reuse"),
361 submit_runner_ram=kwargs.get("submit_runner_ram"),
364 # cwltool.main will write our return value to stdout.
365 return (tmpl.uuid, "success")
366 elif self.work_api == "containers":
367 return (upload_workflow(self, tool, job_order,
370 submit_runner_ram=kwargs.get("submit_runner_ram"),
371 name=kwargs["name"]),
374 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
376 kwargs["make_fs_access"] = make_fs_access
377 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
378 kwargs["use_container"] = True
379 kwargs["tmpdir_prefix"] = "tmp"
380 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
382 if self.work_api == "containers":
383 kwargs["outdir"] = "/var/spool/cwl"
384 kwargs["docker_outdir"] = "/var/spool/cwl"
385 kwargs["tmpdir"] = "/tmp"
386 kwargs["docker_tmpdir"] = "/tmp"
387 elif self.work_api == "jobs":
388 kwargs["outdir"] = "$(task.outdir)"
389 kwargs["docker_outdir"] = "$(task.outdir)"
390 kwargs["tmpdir"] = "$(task.tmpdir)"
393 if kwargs.get("submit"):
394 # Submit a runner job to run the workflow for us.
395 if self.work_api == "containers":
396 if tool.tool["class"] == "CommandLineTool":
397 kwargs["runnerjob"] = tool.tool["id"]
398 upload_dependencies(self,
404 runnerjob = tool.job(job_order,
405 self.output_callback,
408 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
411 submit_runner_ram=kwargs.get("submit_runner_ram"),
412 name=kwargs.get("name"),
413 on_error=kwargs.get("on_error"),
414 submit_runner_image=kwargs.get("submit_runner_image"))
415 elif self.work_api == "jobs":
416 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
419 submit_runner_ram=kwargs.get("submit_runner_ram"),
420 name=kwargs.get("name"),
421 on_error=kwargs.get("on_error"),
422 submit_runner_image=kwargs.get("submit_runner_image"))
424 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
425 # Create pipeline for local run
426 self.pipeline = self.api.pipeline_instances().create(
428 "owner_uuid": self.project_uuid,
429 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
431 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
432 logger.info("Pipeline instance %s", self.pipeline["uuid"])
434 if runnerjob and not kwargs.get("wait"):
435 runnerjob.run(wait=kwargs.get("wait"))
436 return (runnerjob.uuid, "success")
438 self.poll_api = arvados.api('v1')
439 self.polling_thread = threading.Thread(target=self.poll_states)
440 self.polling_thread.start()
443 jobiter = iter((runnerjob,))
445 if "cwl_runner_job" in kwargs:
446 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
447 jobiter = tool.job(job_order,
448 self.output_callback,
453 # Will continue to hold the lock for the duration of this code
454 # except when in cond.wait(), at which point on_message can update
455 # job state and process output callbacks.
457 loopperf = Perf(metrics, "jobiter")
459 for runnable in jobiter:
462 if self.stop_polling.is_set():
466 with Perf(metrics, "run"):
467 runnable.run(**kwargs)
472 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
477 while self.processes:
480 except UnsupportedRequirement:
483 if sys.exc_info()[0] is KeyboardInterrupt:
484 logger.error("Interrupted, marking pipeline as failed")
486 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
488 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
489 body={"state": "Failed"}).execute(num_retries=self.num_retries)
490 if runnerjob and runnerjob.uuid and self.work_api == "containers":
491 self.api.container_requests().update(uuid=runnerjob.uuid,
492 body={"priority": "0"}).execute(num_retries=self.num_retries)
495 self.stop_polling.set()
496 self.polling_thread.join()
498 if self.final_status == "UnsupportedRequirement":
499 raise UnsupportedRequirement("Check log for details.")
501 if self.final_output is None:
502 raise WorkflowException("Workflow did not return a result.")
504 if kwargs.get("submit") and isinstance(runnerjob, Runner):
505 logger.info("Final output collection %s", runnerjob.final_output)
507 if self.output_name is None:
508 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
509 if self.output_tags is None:
510 self.output_tags = ""
511 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
512 self.set_crunch_output()
514 if kwargs.get("compute_checksum"):
515 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
516 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
518 return (self.final_output, self.final_status)
522 """Print version string of key packages for provenance and debugging."""
524 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
525 arvpkg = pkg_resources.require("arvados-python-client")
526 cwlpkg = pkg_resources.require("cwltool")
528 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
529 "arvados-python-client", arvpkg[0].version,
530 "cwltool", cwlpkg[0].version)
533 def arg_parser(): # type: () -> argparse.ArgumentParser
534 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
536 parser.add_argument("--basedir", type=str,
537 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
538 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
539 help="Output directory, default current directory")
541 parser.add_argument("--eval-timeout",
542 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
546 exgroup = parser.add_mutually_exclusive_group()
547 exgroup.add_argument("--print-dot", action="store_true",
548 help="Print workflow visualization in graphviz format and exit")
549 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
550 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
552 exgroup = parser.add_mutually_exclusive_group()
553 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
554 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
555 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
557 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
559 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
561 exgroup = parser.add_mutually_exclusive_group()
562 exgroup.add_argument("--enable-reuse", action="store_true",
563 default=True, dest="enable_reuse",
565 exgroup.add_argument("--disable-reuse", action="store_false",
566 default=True, dest="enable_reuse",
569 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
570 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
571 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
572 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
573 help="Ignore Docker image version when deciding whether to reuse past jobs.",
576 exgroup = parser.add_mutually_exclusive_group()
577 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
578 default=True, dest="submit")
579 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
580 default=True, dest="submit")
581 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
582 dest="create_workflow")
583 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
584 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
586 exgroup = parser.add_mutually_exclusive_group()
587 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
588 default=True, dest="wait")
589 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
590 default=True, dest="wait")
592 exgroup = parser.add_mutually_exclusive_group()
593 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
594 default=True, dest="log_timestamps")
595 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
596 default=True, dest="log_timestamps")
598 parser.add_argument("--api", type=str,
599 default=None, dest="work_api",
600 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
602 parser.add_argument("--compute-checksum", action="store_true", default=False,
603 help="Compute checksum of contents while collecting outputs",
604 dest="compute_checksum")
606 parser.add_argument("--submit-runner-ram", type=int,
607 help="RAM (in MiB) required for the workflow runner job (default 1024)",
610 parser.add_argument("--submit-runner-image", type=str,
611 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
614 parser.add_argument("--name", type=str,
615 help="Name to use for workflow execution instance.",
618 parser.add_argument("--on-error", type=str,
619 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
620 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
622 parser.add_argument("--enable-dev", action="store_true",
623 help="Enable loading and running development versions "
624 "of CWL spec.", default=False)
626 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
627 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
632 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
633 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
634 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
635 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
637 cwltool.process.supportedProcessRequirements.extend([
638 "http://arvados.org/cwl#RunInSingleContainer",
639 "http://arvados.org/cwl#OutputDirType",
640 "http://arvados.org/cwl#RuntimeConstraints",
641 "http://arvados.org/cwl#PartitionRequirement",
642 "http://arvados.org/cwl#APIRequirement",
643 "http://commonwl.org/cwltool#LoadListingRequirement"
646 def main(args, stdout, stderr, api_client=None, keep_client=None):
647 parser = arg_parser()
649 job_order_object = None
650 arvargs = parser.parse_args(args)
653 print versionstring()
656 if arvargs.update_workflow:
657 if arvargs.update_workflow.find('-7fd4e-') == 5:
658 want_api = 'containers'
659 elif arvargs.update_workflow.find('-p5p6p-') == 5:
663 if want_api and arvargs.work_api and want_api != arvargs.work_api:
664 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
665 arvargs.update_workflow, want_api, arvargs.work_api))
667 arvargs.work_api = want_api
669 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
670 job_order_object = ({}, "")
675 if api_client is None:
676 api_client=arvados.api('v1', model=OrderedJsonModel())
677 if keep_client is None:
678 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
679 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
680 num_retries=4, output_name=arvargs.output_name,
681 output_tags=arvargs.output_tags)
682 except Exception as e:
687 logger.setLevel(logging.DEBUG)
688 logging.getLogger('arvados').setLevel(logging.DEBUG)
691 logger.setLevel(logging.WARN)
692 logging.getLogger('arvados').setLevel(logging.WARN)
693 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
696 metrics.setLevel(logging.DEBUG)
697 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
699 if arvargs.log_timestamps:
700 arvados.log_handler.setFormatter(logging.Formatter(
701 '%(asctime)s %(name)s %(levelname)s: %(message)s',
702 '%Y-%m-%d %H:%M:%S'))
704 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
706 arvargs.conformance_test = None
707 arvargs.use_container = True
708 arvargs.relax_path_checks = True
709 arvargs.validate = None
711 make_fs_access = partial(CollectionFsAccess,
712 collection_cache=runner.collection_cache)
714 return cwltool.main.main(args=arvargs,
717 executor=runner.arv_executor,
718 makeTool=runner.arv_make_tool,
719 versionfunc=versionstring,
720 job_order_object=job_order_object,
721 make_fs_access=make_fs_access,
722 fetcher_constructor=partial(CollectionFetcher,
723 api_client=api_client,
724 fs_access=make_fs_access(""),
725 num_retries=runner.num_retries),
726 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
727 logger_handler=arvados.log_handler,
728 custom_schema_callback=add_arv_hints)