3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
15 from functools import partial
16 import pkg_resources # part of setuptools
18 from cwltool.errors import WorkflowException
20 import cwltool.workflow
21 import cwltool.process
23 from schema_salad.sourceline import SourceLine
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
50 arvados.log_handler.setFormatter(logging.Formatter(
51 '%(asctime)s %(name)s %(levelname)s: %(message)s',
54 class ArvCwlRunner(object):
55 """Execute a CWL tool or workflow, submit work (using either jobs or
56 containers API), wait for them to complete, and report output.
60 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
63 self.lock = threading.Lock()
64 self.cond = threading.Condition(self.lock)
65 self.final_output = None
66 self.final_status = None
68 self.num_retries = num_retries
70 self.stop_polling = threading.Event()
73 self.final_output_collection = None
74 self.output_name = output_name
75 self.output_tags = output_tags
76 self.project_uuid = None
77 self.intermediate_output_ttl = 0
78 self.intermediate_output_collections = []
80 if keep_client is not None:
81 self.keep_client = keep_client
83 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
85 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
88 expected_api = ["jobs", "containers"]
89 for api in expected_api:
91 methods = self.api._rootDesc.get('resources')[api]['methods']
92 if ('httpMethod' in methods['create'] and
93 (work_api == api or work_api is None)):
101 raise Exception("No supported APIs")
103 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
105 def arv_make_tool(self, toolpath_object, **kwargs):
106 kwargs["work_api"] = self.work_api
107 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
109 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
110 num_retries=self.num_retries)
111 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
112 return ArvadosCommandTool(self, toolpath_object, **kwargs)
113 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
114 return ArvadosWorkflow(self, toolpath_object, **kwargs)
116 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
118 def output_callback(self, out, processStatus):
119 if processStatus == "success":
120 logger.info("Overall process status is %s", processStatus)
122 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123 body={"state": "Complete"}).execute(num_retries=self.num_retries)
125 logger.warn("Overall process status is %s", processStatus)
127 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128 body={"state": "Failed"}).execute(num_retries=self.num_retries)
129 self.final_status = processStatus
130 self.final_output = out
132 def on_message(self, event):
133 if "object_uuid" in event:
134 if event["object_uuid"] in self.processes and event["event_type"] == "update":
135 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
136 uuid = event["object_uuid"]
138 j = self.processes[uuid]
139 logger.info("%s %s is Running", self.label(j), uuid)
141 j.update_pipeline_component(event["properties"]["new_attributes"])
142 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
143 uuid = event["object_uuid"]
146 j = self.processes[uuid]
147 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
148 with Perf(metrics, "done %s" % j.name):
149 j.done(event["properties"]["new_attributes"])
154 def label(self, obj):
155 return "[%s %s]" % (self.work_api[0:-1], obj.name)
157 def poll_states(self):
158 """Poll status of jobs or containers listed in the processes dict.
160 Runs in a separate thread.
165 self.stop_polling.wait(15)
166 if self.stop_polling.is_set():
169 keys = self.processes.keys()
173 if self.work_api == "containers":
174 table = self.poll_api.container_requests()
175 elif self.work_api == "jobs":
176 table = self.poll_api.jobs()
179 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
180 except Exception as e:
181 logger.warn("Error checking states on API server: %s", e)
184 for p in proc_states["items"]:
186 "object_uuid": p["uuid"],
187 "event_type": "update",
193 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
195 self.processes.clear()
199 self.stop_polling.set()
201 def get_uploaded(self):
202 return self.uploaded.copy()
204 def add_uploaded(self, src, pair):
205 self.uploaded[src] = pair
207 def add_intermediate_output(self, uuid):
209 self.intermediate_output_collections.append(uuid)
211 def trash_intermediate_output(self):
212 logger.info("Cleaning up intermediate output collections")
213 for i in self.intermediate_output_collections:
215 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
217 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
218 if sys.exc_info()[0] is KeyboardInterrupt:
221 def check_features(self, obj):
222 if isinstance(obj, dict):
223 if obj.get("writable"):
224 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
225 if obj.get("class") == "DockerRequirement":
226 if obj.get("dockerOutputDirectory"):
227 # TODO: can be supported by containers API, but not jobs API.
228 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
229 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
230 for v in obj.itervalues():
231 self.check_features(v)
232 elif isinstance(obj, list):
233 for i,v in enumerate(obj):
234 with SourceLine(obj, i, UnsupportedRequirement):
235 self.check_features(v)
237 def make_output_collection(self, name, tagsString, outputObj):
238 outputObj = copy.deepcopy(outputObj)
241 def capture(fileobj):
242 files.append(fileobj)
244 adjustDirObjs(outputObj, capture)
245 adjustFileObjs(outputObj, capture)
247 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
249 final = arvados.collection.Collection(api_client=self.api,
250 keep_client=self.keep_client,
251 num_retries=self.num_retries)
253 for k,v in generatemapper.items():
254 if k.startswith("_:"):
255 if v.type == "Directory":
257 if v.type == "CreateFile":
258 with final.open(v.target, "wb") as f:
259 f.write(v.resolved.encode("utf-8"))
262 if not k.startswith("keep:"):
263 raise Exception("Output source is not in keep or a literal")
265 srccollection = sp[0][5:]
267 reader = self.collection_cache.get(srccollection)
268 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
270 except arvados.errors.ArgumentError as e:
271 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
274 logger.warn("While preparing output collection: %s", e)
276 def rewrite(fileobj):
277 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
278 for k in ("basename", "listing", "contents"):
282 adjustDirObjs(outputObj, rewrite)
283 adjustFileObjs(outputObj, rewrite)
285 with final.open("cwl.output.json", "w") as f:
286 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
288 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
290 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
291 final.api_response()["name"],
292 final.manifest_locator())
294 final_uuid = final.manifest_locator()
295 tags = tagsString.split(',')
297 self.api.links().create(body={
298 "head_uuid": final_uuid, "link_class": "tag", "name": tag
299 }).execute(num_retries=self.num_retries)
301 def finalcollection(fileobj):
302 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
304 adjustDirObjs(outputObj, finalcollection)
305 adjustFileObjs(outputObj, finalcollection)
307 return (outputObj, final)
309 def set_crunch_output(self):
310 if self.work_api == "containers":
312 current = self.api.containers().current().execute(num_retries=self.num_retries)
313 except ApiError as e:
314 # Status code 404 just means we're not running in a container.
315 if e.resp.status != 404:
316 logger.info("Getting current container: %s", e)
319 self.api.containers().update(uuid=current['uuid'],
321 'output': self.final_output_collection.portable_data_hash(),
322 }).execute(num_retries=self.num_retries)
323 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
326 }).execute(num_retries=self.num_retries)
327 except Exception as e:
328 logger.info("Setting container output: %s", e)
329 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
330 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
332 'output': self.final_output_collection.portable_data_hash(),
333 'success': self.final_status == "success",
335 }).execute(num_retries=self.num_retries)
337 def arv_executor(self, tool, job_order, **kwargs):
338 self.debug = kwargs.get("debug")
340 tool.visit(self.check_features)
342 self.project_uuid = kwargs.get("project_uuid")
344 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
345 collection_cache=self.collection_cache)
346 self.fs_access = make_fs_access(kwargs["basedir"])
348 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
349 if self.intermediate_output_ttl and self.work_api != "containers":
350 raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
352 if not kwargs.get("name"):
353 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
355 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
356 # Also uploads docker images.
357 upload_workflow_deps(self, tool)
359 # Reload tool object which may have been updated by
360 # upload_workflow_deps
361 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
362 makeTool=self.arv_make_tool,
363 loader=tool.doc_loader,
364 avsc_names=tool.doc_schema,
365 metadata=tool.metadata)
367 # Upload local file references in the job order.
368 job_order = upload_job_order(self, "%s input" % kwargs["name"],
371 existing_uuid = kwargs.get("update_workflow")
372 if existing_uuid or kwargs.get("create_workflow"):
373 # Create a pipeline template or workflow record and exit.
374 if self.work_api == "jobs":
375 tmpl = RunnerTemplate(self, tool, job_order,
376 kwargs.get("enable_reuse"),
378 submit_runner_ram=kwargs.get("submit_runner_ram"),
381 # cwltool.main will write our return value to stdout.
382 return (tmpl.uuid, "success")
383 elif self.work_api == "containers":
384 return (upload_workflow(self, tool, job_order,
387 submit_runner_ram=kwargs.get("submit_runner_ram"),
388 name=kwargs["name"]),
391 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
393 kwargs["make_fs_access"] = make_fs_access
394 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
395 kwargs["use_container"] = True
396 kwargs["tmpdir_prefix"] = "tmp"
397 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
399 if self.work_api == "containers":
400 kwargs["outdir"] = "/var/spool/cwl"
401 kwargs["docker_outdir"] = "/var/spool/cwl"
402 kwargs["tmpdir"] = "/tmp"
403 kwargs["docker_tmpdir"] = "/tmp"
404 elif self.work_api == "jobs":
405 kwargs["outdir"] = "$(task.outdir)"
406 kwargs["docker_outdir"] = "$(task.outdir)"
407 kwargs["tmpdir"] = "$(task.tmpdir)"
410 if kwargs.get("submit"):
411 # Submit a runner job to run the workflow for us.
412 if self.work_api == "containers":
413 if tool.tool["class"] == "CommandLineTool":
414 kwargs["runnerjob"] = tool.tool["id"]
415 upload_dependencies(self,
421 runnerjob = tool.job(job_order,
422 self.output_callback,
425 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
428 submit_runner_ram=kwargs.get("submit_runner_ram"),
429 name=kwargs.get("name"),
430 on_error=kwargs.get("on_error"),
431 submit_runner_image=kwargs.get("submit_runner_image"),
432 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
433 elif self.work_api == "jobs":
434 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
437 submit_runner_ram=kwargs.get("submit_runner_ram"),
438 name=kwargs.get("name"),
439 on_error=kwargs.get("on_error"),
440 submit_runner_image=kwargs.get("submit_runner_image"))
442 if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
443 # Create pipeline for local run
444 self.pipeline = self.api.pipeline_instances().create(
446 "owner_uuid": self.project_uuid,
447 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
449 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
450 logger.info("Pipeline instance %s", self.pipeline["uuid"])
452 if runnerjob and not kwargs.get("wait"):
453 runnerjob.run(wait=kwargs.get("wait"))
454 return (runnerjob.uuid, "success")
456 self.poll_api = arvados.api('v1')
457 self.polling_thread = threading.Thread(target=self.poll_states)
458 self.polling_thread.start()
461 jobiter = iter((runnerjob,))
463 if "cwl_runner_job" in kwargs:
464 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
465 jobiter = tool.job(job_order,
466 self.output_callback,
471 # Will continue to hold the lock for the duration of this code
472 # except when in cond.wait(), at which point on_message can update
473 # job state and process output callbacks.
475 loopperf = Perf(metrics, "jobiter")
477 for runnable in jobiter:
480 if self.stop_polling.is_set():
484 with Perf(metrics, "run"):
485 runnable.run(**kwargs)
490 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
495 while self.processes:
498 except UnsupportedRequirement:
501 if sys.exc_info()[0] is KeyboardInterrupt:
502 logger.error("Interrupted, marking pipeline as failed")
504 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
506 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
507 body={"state": "Failed"}).execute(num_retries=self.num_retries)
508 if runnerjob and runnerjob.uuid and self.work_api == "containers":
509 self.api.container_requests().update(uuid=runnerjob.uuid,
510 body={"priority": "0"}).execute(num_retries=self.num_retries)
513 self.stop_polling.set()
514 self.polling_thread.join()
516 if self.final_status == "UnsupportedRequirement":
517 raise UnsupportedRequirement("Check log for details.")
519 if self.final_output is None:
520 raise WorkflowException("Workflow did not return a result.")
522 if kwargs.get("submit") and isinstance(runnerjob, Runner):
523 logger.info("Final output collection %s", runnerjob.final_output)
525 if self.output_name is None:
526 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
527 if self.output_tags is None:
528 self.output_tags = ""
529 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
530 self.set_crunch_output()
532 if kwargs.get("compute_checksum"):
533 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
534 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
536 if self.final_status == "success":
537 self.trash_intermediate_output()
539 return (self.final_output, self.final_status)
543 """Print version string of key packages for provenance and debugging."""
545 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
546 arvpkg = pkg_resources.require("arvados-python-client")
547 cwlpkg = pkg_resources.require("cwltool")
549 return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
550 "arvados-python-client", arvpkg[0].version,
551 "cwltool", cwlpkg[0].version)
554 def arg_parser(): # type: () -> argparse.ArgumentParser
555 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
557 parser.add_argument("--basedir", type=str,
558 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
559 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
560 help="Output directory, default current directory")
562 parser.add_argument("--eval-timeout",
563 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
567 exgroup = parser.add_mutually_exclusive_group()
568 exgroup.add_argument("--print-dot", action="store_true",
569 help="Print workflow visualization in graphviz format and exit")
570 exgroup.add_argument("--version", action="store_true", help="Print version and exit")
571 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
573 exgroup = parser.add_mutually_exclusive_group()
574 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
575 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
576 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
578 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
580 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
582 exgroup = parser.add_mutually_exclusive_group()
583 exgroup.add_argument("--enable-reuse", action="store_true",
584 default=True, dest="enable_reuse",
586 exgroup.add_argument("--disable-reuse", action="store_false",
587 default=True, dest="enable_reuse",
590 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
591 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
592 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
593 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
594 help="Ignore Docker image version when deciding whether to reuse past jobs.",
597 exgroup = parser.add_mutually_exclusive_group()
598 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
599 default=True, dest="submit")
600 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
601 default=True, dest="submit")
602 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
603 dest="create_workflow")
604 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
605 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
607 exgroup = parser.add_mutually_exclusive_group()
608 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
609 default=True, dest="wait")
610 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
611 default=True, dest="wait")
613 exgroup = parser.add_mutually_exclusive_group()
614 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
615 default=True, dest="log_timestamps")
616 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
617 default=True, dest="log_timestamps")
619 parser.add_argument("--api", type=str,
620 default=None, dest="work_api",
621 help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
623 parser.add_argument("--compute-checksum", action="store_true", default=False,
624 help="Compute checksum of contents while collecting outputs",
625 dest="compute_checksum")
627 parser.add_argument("--submit-runner-ram", type=int,
628 help="RAM (in MiB) required for the workflow runner job (default 1024)",
631 parser.add_argument("--submit-runner-image", type=str,
632 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
635 parser.add_argument("--name", type=str,
636 help="Name to use for workflow execution instance.",
639 parser.add_argument("--on-error", type=str,
640 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
641 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
643 parser.add_argument("--enable-dev", action="store_true",
644 help="Enable loading and running development versions "
645 "of CWL spec.", default=False)
646 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
647 help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
650 parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
651 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
656 cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
657 cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
658 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
659 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
661 cwltool.process.supportedProcessRequirements.extend([
662 "http://arvados.org/cwl#RunInSingleContainer",
663 "http://arvados.org/cwl#OutputDirType",
664 "http://arvados.org/cwl#RuntimeConstraints",
665 "http://arvados.org/cwl#PartitionRequirement",
666 "http://arvados.org/cwl#APIRequirement",
667 "http://commonwl.org/cwltool#LoadListingRequirement",
668 "http://arvados.org/cwl#IntermediateOutput"
671 def main(args, stdout, stderr, api_client=None, keep_client=None):
672 parser = arg_parser()
674 job_order_object = None
675 arvargs = parser.parse_args(args)
678 print versionstring()
681 if arvargs.update_workflow:
682 if arvargs.update_workflow.find('-7fd4e-') == 5:
683 want_api = 'containers'
684 elif arvargs.update_workflow.find('-p5p6p-') == 5:
688 if want_api and arvargs.work_api and want_api != arvargs.work_api:
689 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
690 arvargs.update_workflow, want_api, arvargs.work_api))
692 arvargs.work_api = want_api
694 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
695 job_order_object = ({}, "")
700 if api_client is None:
701 api_client=arvados.api('v1', model=OrderedJsonModel())
702 if keep_client is None:
703 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
704 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
705 num_retries=4, output_name=arvargs.output_name,
706 output_tags=arvargs.output_tags)
707 except Exception as e:
712 logger.setLevel(logging.DEBUG)
713 logging.getLogger('arvados').setLevel(logging.DEBUG)
716 logger.setLevel(logging.WARN)
717 logging.getLogger('arvados').setLevel(logging.WARN)
718 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
721 metrics.setLevel(logging.DEBUG)
722 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
724 if arvargs.log_timestamps:
725 arvados.log_handler.setFormatter(logging.Formatter(
726 '%(asctime)s %(name)s %(levelname)s: %(message)s',
727 '%Y-%m-%d %H:%M:%S'))
729 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
731 arvargs.conformance_test = None
732 arvargs.use_container = True
733 arvargs.relax_path_checks = True
734 arvargs.validate = None
736 make_fs_access = partial(CollectionFsAccess,
737 collection_cache=runner.collection_cache)
739 return cwltool.main.main(args=arvargs,
742 executor=runner.arv_executor,
743 makeTool=runner.arv_make_tool,
744 versionfunc=versionstring,
745 job_order_object=job_order_object,
746 make_fs_access=make_fs_access,
747 fetcher_constructor=partial(CollectionFetcher,
748 api_client=api_client,
749 fs_access=make_fs_access(""),
750 num_retries=runner.num_retries),
751 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
752 logger_handler=arvados.log_handler,
753 custom_schema_callback=add_arv_hints)