3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
78 self.intermediate_output_collections = []
80 if keep_client is not None:
81 self.keep_client = keep_client
83 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
88 expected_api = ["jobs", "containers"]
89 for api in expected_api:
91 methods = self.api._rootDesc.get('resources')[api]['methods']
92 if ('httpMethod' in methods['create'] and
93 (work_api == api or work_api is None)):
101 raise Exception("No supported APIs")
103 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105 def arv_make_tool(self, toolpath_object, **kwargs):
106 kwargs["work_api"] = self.work_api
107 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
110 num_retries=self.num_retries)
111 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
112 return ArvadosCommandTool(self, toolpath_object, **kwargs)
113 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
114 return ArvadosWorkflow(self, toolpath_object, **kwargs)
116 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
118 def output_callback(self, out, processStatus):
119 if processStatus == "success":
120 logger.info("Overall process status is %s", processStatus)
122 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123 body={"state": "Complete"}).execute(num_retries=self.num_retries)
125 logger.warn("Overall process status is %s", processStatus)
127 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128 body={"state": "Failed"}).execute(num_retries=self.num_retries)
129 self.final_status = processStatus
130 self.final_output = out
132 def on_message(self, event):
133 if "object_uuid" in event:
134 if event["object_uuid"] in self.processes and event["event_type"] == "update":
135 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
136 uuid = event["object_uuid"]
138 j = self.processes[uuid]
139 logger.info("%s %s is Running", self.label(j), uuid)
141 j.update_pipeline_component(event["properties"]["new_attributes"])
142 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
143 uuid = event["object_uuid"]
146 j = self.processes[uuid]
147 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
148 with Perf(metrics, "done %s" % j.name):
149 j.done(event["properties"]["new_attributes"])
154 def label(self, obj):
155 return "[%s %s]" % (self.work_api[0:-1], obj.name)
157 def poll_states(self):
158 """Poll status of jobs or containers listed in the processes dict.
160 Runs in a separate thread.
165 self.stop_polling.wait(15)
166 if self.stop_polling.is_set():
169 keys = self.processes.keys()
173 if self.work_api == "containers":
174 table = self.poll_api.container_requests()
175 elif self.work_api == "jobs":
176 table = self.poll_api.jobs()
179 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
180 except Exception as e:
181 logger.warn("Error checking states on API server: %s", e)
184 for p in proc_states["items"]:
186 "object_uuid": p["uuid"],
187 "event_type": "update",
193 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
195 self.processes.clear()
199 self.stop_polling.set()
201 def get_uploaded(self):
202 return self.uploaded.copy()
204 def add_uploaded(self, src, pair):
205 self.uploaded[src] = pair
207 def add_intermediate_output(self, uuid):
209 self.intermediate_output_collections.append(uuid)
211 def trash_intermediate_output(self):
212 logger.info("Cleaning up intermediate output collections")
213 for i in self.intermediate_output_collections:
215 self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries)
217 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
218 if sys.exc_info()[0] is KeyboardInterrupt:
221 def check_features(self, obj):
222 if isinstance(obj, dict):
223 if obj.get("writable"):
224 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
225 if obj.get("class") == "DockerRequirement":
226 if obj.get("dockerOutputDirectory"):
227 # TODO: can be supported by containers API, but not jobs API.
228 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
229 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
230 for v in obj.itervalues():
231 self.check_features(v)
232 elif isinstance(obj, list):
233 for i,v in enumerate(obj):
234 with SourceLine(obj, i, UnsupportedRequirement):
235 self.check_features(v)
237 def make_output_collection(self, name, tagsString, outputObj):
238 outputObj = copy.deepcopy(outputObj)
241 def capture(fileobj):
242 files.append(fileobj)
244 adjustDirObjs(outputObj, capture)
245 adjustFileObjs(outputObj, capture)
247 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
249 final = arvados.collection.Collection(api_client=self.api,
250 keep_client=self.keep_client,
251 num_retries=self.num_retries)
253 for k,v in generatemapper.items():
254 if k.startswith("_:"):
255 if v.type == "Directory":
257 if v.type == "CreateFile":
258 with final.open(v.target, "wb") as f:
259 f.write(v.resolved.encode("utf-8"))
262 if not k.startswith("keep:"):
263 raise Exception("Output source is not in keep or a literal")
265 srccollection = sp[0][5:]
267 reader = self.collection_cache.get(srccollection)
268 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
270 except arvados.errors.ArgumentError as e:
271 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
274 logger.warn("While preparing output collection: %s", e)
276 def rewrite(fileobj):
277 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
278 for k in ("basename", "listing", "contents"):
282 adjustDirObjs(outputObj, rewrite)
283 adjustFileObjs(outputObj, rewrite)
285 with final.open("cwl.output.json", "w") as f:
286 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
288 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
290 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
291 final.api_response()["name"],
292 final.manifest_locator())
294 final_uuid = final.manifest_locator()
295 tags = tagsString.split(',')
297 self.api.links().create(body={
298 "head_uuid": final_uuid, "link_class": "tag", "name": tag
299 }).execute(num_retries=self.num_retries)
301 def finalcollection(fileobj):
302 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
304 adjustDirObjs(outputObj, finalcollection)
305 adjustFileObjs(outputObj, finalcollection)
307 return (outputObj, final)
309 def set_crunch_output(self):
310 if self.work_api == "containers":
312 current = self.api.containers().current().execute(num_retries=self.num_retries)
313 except ApiError as e:
314 # Status code 404 just means we're not running in a container.
315 if e.resp.status != 404:
316 logger.info("Getting current container: %s", e)
319 self.api.containers().update(uuid=current['uuid'],
321 'output': self.final_output_collection.portable_data_hash(),
322 }).execute(num_retries=self.num_retries)
323 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
326 }).execute(num_retries=self.num_retries)
327 except Exception as e:
328 logger.info("Setting container output: %s", e)
329 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
330 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
332 'output': self.final_output_collection.portable_data_hash(),
333 'success': self.final_status == "success",
335 }).execute(num_retries=self.num_retries)
337 def arv_executor(self, tool, job_order, **kwargs):
338 self.debug = kwargs.get("debug")
340 tool.visit(self.check_features)
342 self.project_uuid = kwargs.get("project_uuid")
344 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
345 collection_cache=self.collection_cache)
346 self.fs_access = make_fs_access(kwargs["basedir"])
348 self.output_ttl = kwargs["intermediate_output_ttl"]
349 if self.output_ttl and self.work_api != "containers":
350 raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
352 if not kwargs.get("name"):
353 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
355 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
356 # Also uploads docker images.
357 upload_workflow_deps(self, tool)
359 # Reload tool object which may have been updated by
360 # upload_workflow_deps
361 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
362 makeTool=self.arv_make_tool,
363 loader=tool.doc_loader,
364 avsc_names=tool.doc_schema,
365 metadata=tool.metadata)
367 # Upload local file references in the job order.
368 job_order = upload_job_order(self, "%s input" % kwargs["name"],
371 existing_uuid = kwargs.get("update_workflow")
372 if existing_uuid or kwargs.get("create_workflow"):
373 # Create a pipeline template or workflow record and exit.
374 if self.work_api == "jobs":
375 tmpl = RunnerTemplate(self, tool, job_order,
376 kwargs.get("enable_reuse"),
378 submit_runner_ram=kwargs.get("submit_runner_ram"),
381 # cwltool.main will write our return value to stdout.
382 return (tmpl.uuid, "success")
383 elif self.work_api == "containers":
384 return (upload_workflow(self, tool, job_order,
387 submit_runner_ram=kwargs.get("submit_runner_ram"),
388 name=kwargs["name"]),
391 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
393 kwargs["make_fs_access"] = make_fs_access
394 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
395 kwargs["use_container"] = True
396 kwargs["tmpdir_prefix"] = "tmp"
397 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
399 if self.work_api == "containers":
400 kwargs["outdir"] = "/var/spool/cwl"
401 kwargs["docker_outdir"] = "/var/spool/cwl"
402 kwargs["tmpdir"] = "/tmp"
403 kwargs["docker_tmpdir"] = "/tmp"
404 elif self.work_api == "jobs":
405 kwargs["outdir"] = "$(task.outdir)"
406 kwargs["docker_outdir"] = "$(task.outdir)"
407 kwargs["tmpdir"] = "$(task.tmpdir)"
410 if kwargs.get("submit"):
411 # Submit a runner job to run the workflow for us.
412 if self.work_api == "containers":
413 if tool.tool["class"] == "CommandLineTool":
414 kwargs["runnerjob"] = tool.tool["id"]
415 upload_dependencies(self,
421 runnerjob = tool.job(job_order,
422 self.output_callback,
425 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
428 submit_runner_ram=kwargs.get("submit_runner_ram"),
429 name=kwargs.get("name"),
430 on_error=kwargs.get("on_error"),
431 submit_runner_image=kwargs.get("submit_runner_image"))
432 elif self.work_api == "jobs":
433 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
436 submit_runner_ram=kwargs.get("submit_runner_ram"),
437 name=kwargs.get("name"),
438 on_error=kwargs.get("on_error"),
439 submit_runner_image=kwargs.get("submit_runner_image"))
441 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
442 # Create pipeline for local run
443 self.pipeline = self.api.pipeline_instances().create(
445 "owner_uuid": self.project_uuid,
446 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
448 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
449 logger.info("Pipeline instance %s", self.pipeline["uuid"])
451 if runnerjob and not kwargs.get("wait"):
452 runnerjob.run(wait=kwargs.get("wait"))
453 return (runnerjob.uuid, "success")
455 self.poll_api = arvados.api('v1')
456 self.polling_thread = threading.Thread(target=self.poll_states)
457 self.polling_thread.start()
460 jobiter = iter((runnerjob,))
462 if "cwl_runner_job" in kwargs:
463 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
464 jobiter = tool.job(job_order,
465 self.output_callback,
470 # Will continue to hold the lock for the duration of this code
471 # except when in cond.wait(), at which point on_message can update
472 # job state and process output callbacks.
474 loopperf = Perf(metrics, "jobiter")
476 for runnable in jobiter:
479 if self.stop_polling.is_set():
483 with Perf(metrics, "run"):
484 runnable.run(**kwargs)
489 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
494 while self.processes:
497 except UnsupportedRequirement:
500 if sys.exc_info()[0] is KeyboardInterrupt:
501 logger.error("Interrupted, marking pipeline as failed")
503 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
505 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
506 body={"state": "Failed"}).execute(num_retries=self.num_retries)
507 if runnerjob and runnerjob.uuid and self.work_api == "containers":
508 self.api.container_requests().update(uuid=runnerjob.uuid,
509 body={"priority": "0"}).execute(num_retries=self.num_retries)
512 self.stop_polling.set()
513 self.polling_thread.join()
515 if self.final_status == "UnsupportedRequirement":
516 raise UnsupportedRequirement("Check log for details.")
518 if self.final_output is None:
519 raise WorkflowException("Workflow did not return a result.")
521 if kwargs.get("submit") and isinstance(runnerjob, Runner):
522 logger.info("Final output collection %s", runnerjob.final_output)
524 if self.output_name is None:
525 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
526 if self.output_tags is None:
527 self.output_tags = ""
528 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
529 self.set_crunch_output()
531 if kwargs.get("compute_checksum"):
532 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
533 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
535 if self.output_ttl and self.final_status == "success":
536 self.trash_intermediate_output()
538 return (self.final_output, self.final_status)
542 """Print version string of key packages for provenance and debugging."""
544 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
545 arvpkg = pkg_resources.require("arvados-python-client")
546 cwlpkg = pkg_resources.require("cwltool")
548 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
549 "arvados-python-client", arvpkg[0].version,
550 "cwltool", cwlpkg[0].version)
553 def arg_parser(): # type: () -> argparse.ArgumentParser
554 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
556 parser.add_argument("--basedir", type=str,
557 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
558 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
559 help="Output directory, default current directory")
561 parser.add_argument("--eval-timeout",
562 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
566 exgroup = parser.add_mutually_exclusive_group()
567 exgroup.add_argument("--print-dot", action="store_true",
568 help="Print workflow visualization in graphviz format and exit")
569 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
570 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
572 exgroup = parser.add_mutually_exclusive_group()
573 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
574 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
575 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
577 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
579 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
581 exgroup = parser.add_mutually_exclusive_group()
582 exgroup.add_argument("--enable-reuse", action="store_true",
583 default=True, dest="enable_reuse",
585 exgroup.add_argument("--disable-reuse", action="store_false",
586 default=True, dest="enable_reuse",
589 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
590 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
591 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
592 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
593 help="Ignore Docker image version when deciding whether to reuse past jobs.",
596 exgroup = parser.add_mutually_exclusive_group()
597 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
598 default=True, dest="submit")
599 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
600 default=True, dest="submit")
601 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
602 dest="create_workflow")
603 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
604 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
606 exgroup = parser.add_mutually_exclusive_group()
607 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
608 default=True, dest="wait")
609 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
610 default=True, dest="wait")
612 exgroup = parser.add_mutually_exclusive_group()
613 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
614 default=True, dest="log_timestamps")
615 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
616 default=True, dest="log_timestamps")
618 parser.add_argument("--api", type=str,
619 default=None, dest="work_api",
620 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
622 parser.add_argument("--compute-checksum", action="store_true", default=False,
623 help="Compute checksum of contents while collecting outputs",
624 dest="compute_checksum")
626 parser.add_argument("--submit-runner-ram", type=int,
627 help="RAM (in MiB) required for the workflow runner job (default 1024)",
630 parser.add_argument("--submit-runner-image", type=str,
631 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
634 parser.add_argument("--name", type=str,
635 help="Name to use for workflow execution instance.",
638 parser.add_argument("--on-error", type=str,
639 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
640 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
642 parser.add_argument("--enable-dev", action="store_true",
643 help="Enable loading and running development versions "
644 "of CWL spec.", default=False)
645 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
646 help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
649 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
650 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
655 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
656 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
657 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
658 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
660 cwltool.process.supportedProcessRequirements.extend([
661 "http://arvados.org/cwl#RunInSingleContainer",
662 "http://arvados.org/cwl#OutputDirType",
663 "http://arvados.org/cwl#RuntimeConstraints",
664 "http://arvados.org/cwl#PartitionRequirement",
665 "http://arvados.org/cwl#APIRequirement",
666 "http://commonwl.org/cwltool#LoadListingRequirement"
669 def main(args, stdout, stderr, api_client=None, keep_client=None):
670 parser = arg_parser()
672 job_order_object = None
673 arvargs = parser.parse_args(args)
676 print versionstring()
679 if arvargs.update_workflow:
680 if arvargs.update_workflow.find('-7fd4e-') == 5:
681 want_api = 'containers'
682 elif arvargs.update_workflow.find('-p5p6p-') == 5:
686 if want_api and arvargs.work_api and want_api != arvargs.work_api:
687 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
688 arvargs.update_workflow, want_api, arvargs.work_api))
690 arvargs.work_api = want_api
692 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
693 job_order_object = ({}, "")
698 if api_client is None:
699 api_client=arvados.api('v1', model=OrderedJsonModel())
700 if keep_client is None:
701 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
702 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
703 num_retries=4, output_name=arvargs.output_name,
704 output_tags=arvargs.output_tags)
705 except Exception as e:
710 logger.setLevel(logging.DEBUG)
711 logging.getLogger('arvados').setLevel(logging.DEBUG)
714 logger.setLevel(logging.WARN)
715 logging.getLogger('arvados').setLevel(logging.WARN)
716 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
719 metrics.setLevel(logging.DEBUG)
720 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
722 if arvargs.log_timestamps:
723 arvados.log_handler.setFormatter(logging.Formatter(
724 '%(asctime)s %(name)s %(levelname)s: %(message)s',
725 '%Y-%m-%d %H:%M:%S'))
727 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
729 arvargs.conformance_test = None
730 arvargs.use_container = True
731 arvargs.relax_path_checks = True
732 arvargs.validate = None
734 make_fs_access = partial(CollectionFsAccess,
735 collection_cache=runner.collection_cache)
737 return cwltool.main.main(args=arvargs,
740 executor=runner.arv_executor,
741 makeTool=runner.arv_make_tool,
742 versionfunc=versionstring,
743 job_order_object=job_order_object,
744 make_fs_access=make_fs_access,
745 fetcher_constructor=partial(CollectionFetcher,
746 api_client=api_client,
747 fs_access=make_fs_access(""),
748 num_retries=runner.num_retries),
749 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
750 logger_handler=arvados.log_handler,
751 custom_schema_callback=add_arv_hints)