2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
18 from functools import partial
19 import pkg_resources # part of setuptools
21 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
53 arvados.log_handler.setFormatter(logging.Formatter(
54 '%(asctime)s %(name)s %(levelname)s: %(message)s',
57 DEFAULT_PRIORITY = 500
59 class ArvCwlRunner(object):
60 """Execute a CWL tool or workflow, submit work (using either jobs or
61 containers API), wait for them to complete, and report output.
65 def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
68 self.lock = threading.Lock()
69 self.cond = threading.Condition(self.lock)
70 self.final_output = None
71 self.final_status = None
73 self.num_retries = num_retries
75 self.stop_polling = threading.Event()
78 self.final_output_collection = None
79 self.output_name = output_name
80 self.output_tags = output_tags
81 self.project_uuid = None
82 self.intermediate_output_ttl = 0
83 self.intermediate_output_collections = []
84 self.trash_intermediate = False
86 if keep_client is not None:
87 self.keep_client = keep_client
89 self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
91 self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
94 expected_api = ["jobs", "containers"]
95 for api in expected_api:
97 methods = self.api._rootDesc.get('resources')[api]['methods']
98 if ('httpMethod' in methods['create'] and
99 (work_api == api or work_api is None)):
105 if not self.work_api:
107 raise Exception("No supported APIs")
109 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
111 def arv_make_tool(self, toolpath_object, **kwargs):
112 kwargs["work_api"] = self.work_api
113 kwargs["fetcher_constructor"] = partial(CollectionFetcher,
115 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116 num_retries=self.num_retries)
117 kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118 if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119 return ArvadosCommandTool(self, toolpath_object, **kwargs)
120 elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121 return ArvadosWorkflow(self, toolpath_object, **kwargs)
123 return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
125 def output_callback(self, out, processStatus):
126 if processStatus == "success":
127 logger.info("Overall process status is %s", processStatus)
129 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130 body={"state": "Complete"}).execute(num_retries=self.num_retries)
132 logger.warn("Overall process status is %s", processStatus)
134 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135 body={"state": "Failed"}).execute(num_retries=self.num_retries)
136 self.final_status = processStatus
137 self.final_output = out
139 def on_message(self, event):
140 if "object_uuid" in event:
141 if event["object_uuid"] in self.processes and event["event_type"] == "update":
142 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
143 uuid = event["object_uuid"]
145 j = self.processes[uuid]
146 logger.info("%s %s is Running", self.label(j), uuid)
148 j.update_pipeline_component(event["properties"]["new_attributes"])
149 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
150 uuid = event["object_uuid"]
153 j = self.processes[uuid]
154 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
155 with Perf(metrics, "done %s" % j.name):
156 j.done(event["properties"]["new_attributes"])
161 def label(self, obj):
162 return "[%s %s]" % (self.work_api[0:-1], obj.name)
164 def poll_states(self):
165 """Poll status of jobs or containers listed in the processes dict.
167 Runs in a separate thread.
172 self.stop_polling.wait(15)
173 if self.stop_polling.is_set():
176 keys = self.processes.keys()
180 if self.work_api == "containers":
181 table = self.poll_api.container_requests()
182 elif self.work_api == "jobs":
183 table = self.poll_api.jobs()
186 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
187 except Exception as e:
188 logger.warn("Error checking states on API server: %s", e)
191 for p in proc_states["items"]:
193 "object_uuid": p["uuid"],
194 "event_type": "update",
200 logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
202 self.processes.clear()
206 self.stop_polling.set()
208 def get_uploaded(self):
209 return self.uploaded.copy()
211 def add_uploaded(self, src, pair):
212 self.uploaded[src] = pair
214 def add_intermediate_output(self, uuid):
216 self.intermediate_output_collections.append(uuid)
218 def trash_intermediate_output(self):
219 logger.info("Cleaning up intermediate output collections")
220 for i in self.intermediate_output_collections:
222 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
224 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
225 if sys.exc_info()[0] is KeyboardInterrupt:
228 def check_features(self, obj):
229 if isinstance(obj, dict):
230 if obj.get("writable") and self.work_api != "containers":
231 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
232 if obj.get("class") == "DockerRequirement":
233 if obj.get("dockerOutputDirectory"):
234 if self.work_api != "containers":
235 raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
236 "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
237 if not obj.get("dockerOutputDirectory").startswith('/'):
238 raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
239 "Option 'dockerOutputDirectory' must be an absolute path.")
240 for v in obj.itervalues():
241 self.check_features(v)
242 elif isinstance(obj, list):
243 for i,v in enumerate(obj):
244 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
245 self.check_features(v)
247 def make_output_collection(self, name, tagsString, outputObj):
248 outputObj = copy.deepcopy(outputObj)
251 def capture(fileobj):
252 files.append(fileobj)
254 adjustDirObjs(outputObj, capture)
255 adjustFileObjs(outputObj, capture)
257 generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
259 final = arvados.collection.Collection(api_client=self.api,
260 keep_client=self.keep_client,
261 num_retries=self.num_retries)
263 for k,v in generatemapper.items():
264 if k.startswith("_:"):
265 if v.type == "Directory":
267 if v.type == "CreateFile":
268 with final.open(v.target, "wb") as f:
269 f.write(v.resolved.encode("utf-8"))
272 if not k.startswith("keep:"):
273 raise Exception("Output source is not in keep or a literal")
275 srccollection = sp[0][5:]
277 reader = self.collection_cache.get(srccollection)
278 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
279 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
280 except arvados.errors.ArgumentError as e:
281 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
284 logger.warn("While preparing output collection: %s", e)
286 def rewrite(fileobj):
287 fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
288 for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
292 adjustDirObjs(outputObj, rewrite)
293 adjustFileObjs(outputObj, rewrite)
295 with final.open("cwl.output.json", "w") as f:
296 json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
298 final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
300 logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
301 final.api_response()["name"],
302 final.manifest_locator())
304 final_uuid = final.manifest_locator()
305 tags = tagsString.split(',')
307 self.api.links().create(body={
308 "head_uuid": final_uuid, "link_class": "tag", "name": tag
309 }).execute(num_retries=self.num_retries)
311 def finalcollection(fileobj):
312 fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
314 adjustDirObjs(outputObj, finalcollection)
315 adjustFileObjs(outputObj, finalcollection)
317 return (outputObj, final)
319 def set_crunch_output(self):
320 if self.work_api == "containers":
322 current = self.api.containers().current().execute(num_retries=self.num_retries)
323 except ApiError as e:
324 # Status code 404 just means we're not running in a container.
325 if e.resp.status != 404:
326 logger.info("Getting current container: %s", e)
329 self.api.containers().update(uuid=current['uuid'],
331 'output': self.final_output_collection.portable_data_hash(),
332 }).execute(num_retries=self.num_retries)
333 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
336 }).execute(num_retries=self.num_retries)
337 except Exception as e:
338 logger.info("Setting container output: %s", e)
339 elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
340 self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
342 'output': self.final_output_collection.portable_data_hash(),
343 'success': self.final_status == "success",
345 }).execute(num_retries=self.num_retries)
347 def arv_executor(self, tool, job_order, **kwargs):
348 self.debug = kwargs.get("debug")
350 tool.visit(self.check_features)
352 self.project_uuid = kwargs.get("project_uuid")
354 make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
355 collection_cache=self.collection_cache)
356 self.fs_access = make_fs_access(kwargs["basedir"])
359 self.trash_intermediate = kwargs["trash_intermediate"]
360 if self.trash_intermediate and self.work_api != "containers":
361 raise Exception("--trash-intermediate is only supported with --api=containers.")
363 self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
364 if self.intermediate_output_ttl and self.work_api != "containers":
365 raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
366 if self.intermediate_output_ttl < 0:
367 raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
369 if not kwargs.get("name"):
370 kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
372 # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
373 # Also uploads docker images.
374 merged_map = upload_workflow_deps(self, tool)
376 # Reload tool object which may have been updated by
377 # upload_workflow_deps
378 tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
379 makeTool=self.arv_make_tool,
380 loader=tool.doc_loader,
381 avsc_names=tool.doc_schema,
382 metadata=tool.metadata)
384 # Upload local file references in the job order.
385 job_order = upload_job_order(self, "%s input" % kwargs["name"],
388 existing_uuid = kwargs.get("update_workflow")
389 if existing_uuid or kwargs.get("create_workflow"):
390 # Create a pipeline template or workflow record and exit.
391 if self.work_api == "jobs":
392 tmpl = RunnerTemplate(self, tool, job_order,
393 kwargs.get("enable_reuse"),
395 submit_runner_ram=kwargs.get("submit_runner_ram"),
397 merged_map=merged_map)
399 # cwltool.main will write our return value to stdout.
400 return (tmpl.uuid, "success")
401 elif self.work_api == "containers":
402 return (upload_workflow(self, tool, job_order,
405 submit_runner_ram=kwargs.get("submit_runner_ram"),
407 merged_map=merged_map),
410 self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
411 self.eval_timeout = kwargs.get("eval_timeout")
413 kwargs["make_fs_access"] = make_fs_access
414 kwargs["enable_reuse"] = kwargs.get("enable_reuse")
415 kwargs["use_container"] = True
416 kwargs["tmpdir_prefix"] = "tmp"
417 kwargs["compute_checksum"] = kwargs.get("compute_checksum")
419 if self.work_api == "containers":
420 if self.ignore_docker_for_reuse:
421 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
422 kwargs["outdir"] = "/var/spool/cwl"
423 kwargs["docker_outdir"] = "/var/spool/cwl"
424 kwargs["tmpdir"] = "/tmp"
425 kwargs["docker_tmpdir"] = "/tmp"
426 elif self.work_api == "jobs":
427 if kwargs["priority"] != DEFAULT_PRIORITY:
428 raise Exception("--priority not implemented for jobs API.")
429 kwargs["outdir"] = "$(task.outdir)"
430 kwargs["docker_outdir"] = "$(task.outdir)"
431 kwargs["tmpdir"] = "$(task.tmpdir)"
433 if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
434 raise Exception("--priority must be in the range 1..1000.")
437 if kwargs.get("submit"):
438 # Submit a runner job to run the workflow for us.
439 if self.work_api == "containers":
440 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
441 kwargs["runnerjob"] = tool.tool["id"]
442 runnerjob = tool.job(job_order,
443 self.output_callback,
446 runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
449 submit_runner_ram=kwargs.get("submit_runner_ram"),
450 name=kwargs.get("name"),
451 on_error=kwargs.get("on_error"),
452 submit_runner_image=kwargs.get("submit_runner_image"),
453 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
454 merged_map=merged_map,
455 priority=kwargs.get("priority"))
456 elif self.work_api == "jobs":
457 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
460 submit_runner_ram=kwargs.get("submit_runner_ram"),
461 name=kwargs.get("name"),
462 on_error=kwargs.get("on_error"),
463 submit_runner_image=kwargs.get("submit_runner_image"),
464 merged_map=merged_map)
465 elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
466 # Create pipeline for local run
467 self.pipeline = self.api.pipeline_instances().create(
469 "owner_uuid": self.project_uuid,
470 "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
472 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
473 logger.info("Pipeline instance %s", self.pipeline["uuid"])
475 if runnerjob and not kwargs.get("wait"):
476 runnerjob.run(wait=kwargs.get("wait"))
477 return (runnerjob.uuid, "success")
479 self.poll_api = arvados.api('v1')
480 self.polling_thread = threading.Thread(target=self.poll_states)
481 self.polling_thread.start()
484 jobiter = iter((runnerjob,))
486 if "cwl_runner_job" in kwargs:
487 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
488 jobiter = tool.job(job_order,
489 self.output_callback,
494 # Will continue to hold the lock for the duration of this code
495 # except when in cond.wait(), at which point on_message can update
496 # job state and process output callbacks.
498 loopperf = Perf(metrics, "jobiter")
500 for runnable in jobiter:
503 if self.stop_polling.is_set():
507 with Perf(metrics, "run"):
508 runnable.run(**kwargs)
513 logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
518 while self.processes:
521 except UnsupportedRequirement:
524 if sys.exc_info()[0] is KeyboardInterrupt:
525 logger.error("Interrupted, marking pipeline as failed")
527 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
529 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
530 body={"state": "Failed"}).execute(num_retries=self.num_retries)
531 if runnerjob and runnerjob.uuid and self.work_api == "containers":
532 self.api.container_requests().update(uuid=runnerjob.uuid,
533 body={"priority": "0"}).execute(num_retries=self.num_retries)
536 self.stop_polling.set()
537 self.polling_thread.join()
539 if self.final_status == "UnsupportedRequirement":
540 raise UnsupportedRequirement("Check log for details.")
542 if self.final_output is None:
543 raise WorkflowException("Workflow did not return a result.")
545 if kwargs.get("submit") and isinstance(runnerjob, Runner):
546 logger.info("Final output collection %s", runnerjob.final_output)
548 if self.output_name is None:
549 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
550 if self.output_tags is None:
551 self.output_tags = ""
552 self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
553 self.set_crunch_output()
555 if kwargs.get("compute_checksum"):
556 adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
557 adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
559 if self.trash_intermediate and self.final_status == "success":
560 self.trash_intermediate_output()
562 return (self.final_output, self.final_status)
566 """Print version string of key packages for provenance and debugging."""
568 arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
569 arvpkg = pkg_resources.require("arvados-python-client")
570 cwlpkg = pkg_resources.require("cwltool")
572 return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
573 "arvados-python-client", arvpkg[0].version,
574 "cwltool", cwlpkg[0].version)
577 def arg_parser(): # type: () -> argparse.ArgumentParser
578 parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
580 parser.add_argument("--basedir", type=str,
581 help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
582 parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
583 help="Output directory, default current directory")
585 parser.add_argument("--eval-timeout",
586 help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
590 exgroup = parser.add_mutually_exclusive_group()
591 exgroup.add_argument("--print-dot", action="store_true",
592 help="Print workflow visualization in graphviz format and exit")
593 exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
594 exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
596 exgroup = parser.add_mutually_exclusive_group()
597 exgroup.add_argument("--verbose", action="store_true", help="Default logging")
598 exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
599 exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
601 parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
603 parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
605 exgroup = parser.add_mutually_exclusive_group()
606 exgroup.add_argument("--enable-reuse", action="store_true",
607 default=True, dest="enable_reuse",
608 help="Enable job or container reuse (default)")
609 exgroup.add_argument("--disable-reuse", action="store_false",
610 default=True, dest="enable_reuse",
611 help="Disable job or container reuse")
613 parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
614 parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
615 parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
616 parser.add_argument("--ignore-docker-for-reuse", action="store_true",
617 help="Ignore Docker image version when deciding whether to reuse past jobs.",
620 exgroup = parser.add_mutually_exclusive_group()
621 exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
622 default=True, dest="submit")
623 exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
624 default=True, dest="submit")
625 exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
626 dest="create_workflow")
627 exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
628 exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
630 exgroup = parser.add_mutually_exclusive_group()
631 exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
632 default=True, dest="wait")
633 exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
634 default=True, dest="wait")
636 exgroup = parser.add_mutually_exclusive_group()
637 exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
638 default=True, dest="log_timestamps")
639 exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
640 default=True, dest="log_timestamps")
642 parser.add_argument("--api", type=str,
643 default=None, dest="work_api",
644 choices=("jobs", "containers"),
645 help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
647 parser.add_argument("--compute-checksum", action="store_true", default=False,
648 help="Compute checksum of contents while collecting outputs",
649 dest="compute_checksum")
651 parser.add_argument("--submit-runner-ram", type=int,
652 help="RAM (in MiB) required for the workflow runner job (default 1024)",
655 parser.add_argument("--submit-runner-image", type=str,
656 help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
659 parser.add_argument("--name", type=str,
660 help="Name to use for workflow execution instance.",
663 parser.add_argument("--on-error", type=str,
664 help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
665 "Default is 'continue'.", default="continue", choices=("stop", "continue"))
667 parser.add_argument("--enable-dev", action="store_true",
668 help="Enable loading and running development versions "
669 "of CWL spec.", default=False)
671 parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
672 help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
675 parser.add_argument("--priority", type=int,
676 help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
677 default=DEFAULT_PRIORITY)
679 exgroup = parser.add_mutually_exclusive_group()
680 exgroup.add_argument("--trash-intermediate", action="store_true",
681 default=False, dest="trash_intermediate",
682 help="Immediately trash intermediate outputs on workflow success.")
683 exgroup.add_argument("--no-trash-intermediate", action="store_false",
684 default=False, dest="trash_intermediate",
685 help="Do not trash intermediate outputs (default).")
687 parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
688 parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
693 cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
694 cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
695 res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
696 use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
698 cwltool.process.supportedProcessRequirements.extend([
699 "http://arvados.org/cwl#RunInSingleContainer",
700 "http://arvados.org/cwl#OutputDirType",
701 "http://arvados.org/cwl#RuntimeConstraints",
702 "http://arvados.org/cwl#PartitionRequirement",
703 "http://arvados.org/cwl#APIRequirement",
704 "http://commonwl.org/cwltool#LoadListingRequirement",
705 "http://arvados.org/cwl#IntermediateOutput",
706 "http://arvados.org/cwl#ReuseRequirement"
709 def main(args, stdout, stderr, api_client=None, keep_client=None):
710 parser = arg_parser()
712 job_order_object = None
713 arvargs = parser.parse_args(args)
716 print versionstring()
719 if arvargs.update_workflow:
720 if arvargs.update_workflow.find('-7fd4e-') == 5:
721 want_api = 'containers'
722 elif arvargs.update_workflow.find('-p5p6p-') == 5:
726 if want_api and arvargs.work_api and want_api != arvargs.work_api:
727 logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
728 arvargs.update_workflow, want_api, arvargs.work_api))
730 arvargs.work_api = want_api
732 if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
733 job_order_object = ({}, "")
738 if api_client is None:
739 api_client=arvados.api('v1', model=OrderedJsonModel())
740 if keep_client is None:
741 keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
742 runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
743 num_retries=4, output_name=arvargs.output_name,
744 output_tags=arvargs.output_tags)
745 except Exception as e:
750 logger.setLevel(logging.DEBUG)
751 logging.getLogger('arvados').setLevel(logging.DEBUG)
754 logger.setLevel(logging.WARN)
755 logging.getLogger('arvados').setLevel(logging.WARN)
756 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
759 metrics.setLevel(logging.DEBUG)
760 logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
762 if arvargs.log_timestamps:
763 arvados.log_handler.setFormatter(logging.Formatter(
764 '%(asctime)s %(name)s %(levelname)s: %(message)s',
765 '%Y-%m-%d %H:%M:%S'))
767 arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
769 arvargs.conformance_test = None
770 arvargs.use_container = True
771 arvargs.relax_path_checks = True
772 arvargs.print_supported_versions = False
774 make_fs_access = partial(CollectionFsAccess,
775 collection_cache=runner.collection_cache)
777 return cwltool.main.main(args=arvargs,
780 executor=runner.arv_executor,
781 makeTool=runner.arv_make_tool,
782 versionfunc=versionstring,
783 job_order_object=job_order_object,
784 make_fs_access=make_fs_access,
785 fetcher_constructor=partial(CollectionFetcher,
786 api_client=api_client,
787 fs_access=make_fs_access(""),
788 num_retries=runner.num_retries),
789 resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
790 logger_handler=arvados.log_handler,
791 custom_schema_callback=add_arv_hints)