1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
50 from . context import ArvRuntimeContext
52 logger = logging.getLogger('arvados.cwl-runner')
54 def trim_anonymous_location(obj):
55 """Remove 'location' field from File and Directory literals.
57 To make internal handling easier, literals are assigned a random id for
58 'location'. However, when writing the record back out, this can break
59 reproducibility. Since it is valid for literals not have a 'location'
64 if obj.get("location", "").startswith("_:"):
68 def remove_redundant_fields(obj):
69 for field in ("path", "nameext", "nameroot", "dirname"):
74 def find_defaults(d, op):
75 if isinstance(d, list):
78 elif isinstance(d, dict):
82 for i in viewvalues(d):
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
88 files=[], # type: List[Dict[Text, Text]]
89 bindings=[], # type: List[Dict[Text, Any]]
90 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
91 names=None, # type: Names
92 requirements=requirements, # type: List[Dict[Text, Any]]
93 hints=hints, # type: List[Dict[Text, Any]]
94 resources={}, # type: Dict[str, int]
95 mutation_manager=None, # type: Optional[MutationManager]
96 formatgraph=None, # type: Optional[Graph]
97 make_fs_access=None, # type: Type[StdFsAccess]
98 fs_access=None, # type: StdFsAccess
99 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100 timeout=runtimeContext.eval_timeout, # type: float
101 debug=runtimeContext.debug, # type: bool
102 js_console=runtimeContext.js_console, # type: bool
103 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
104 loadListing="", # type: Text
105 outdir="", # type: Text
106 tmpdir="", # type: Text
107 stagedir="", # type: Text
108 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
111 def search_schemadef(name, reqs):
113 if r["class"] == "SchemaDefRequirement":
114 for sd in r["types"]:
115 if sd["name"] == name:
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120 "float", "double", "string", "record",
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125 # union type, collect all possible secondaryFiles
126 for i in inputschema:
127 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130 if isinstance(inputschema, basestring):
131 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137 if "secondaryFiles" in inputschema:
138 # set secondaryFiles, may be inherited by compound types.
139 secondaryspec = inputschema["secondaryFiles"]
141 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142 not isinstance(inputschema["type"], basestring)):
143 # compound type (union, array, record)
144 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146 elif (inputschema["type"] == "record" and
147 isinstance(primary, Mapping)):
149 # record type, find secondary files associated with fields.
151 for f in inputschema["fields"]:
152 p = primary.get(shortname(f["name"]))
154 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156 elif (inputschema["type"] == "array" and
157 isinstance(primary, Sequence)):
159 # array type, find secondary files of elements
162 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164 elif (inputschema["type"] == "File" and
166 isinstance(primary, Mapping) and
167 primary.get("class") == "File" and
168 "secondaryFiles" not in primary):
170 # Found a file, check for secondaryFiles
173 primary["secondaryFiles"] = secondaryspec
174 for i, sf in enumerate(aslist(secondaryspec)):
175 if builder.cwlVersion == "v1.0":
176 pattern = builder.do_eval(sf, context=primary)
178 pattern = builder.do_eval(sf["pattern"], context=primary)
181 if isinstance(pattern, list):
182 specs.extend(pattern)
183 elif isinstance(pattern, dict):
184 specs.append(pattern)
185 elif isinstance(pattern, str):
186 specs.append({"pattern": pattern})
188 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189 "Expression must return list, object, string or null")
192 for i, sf in enumerate(specs):
193 if isinstance(sf, dict):
194 if sf.get("class") == "File":
195 pattern = sf["basename"]
197 pattern = sf["pattern"]
198 required = sf.get("required")
199 elif isinstance(sf, str):
203 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204 "Expression must return list, object, string or null")
206 sfpath = substitute(primary["location"], pattern)
207 required = builder.do_eval(required, context=primary)
209 if fsaccess.exists(sfpath):
210 found.append({"location": sfpath, "class": "File"})
212 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213 "Required secondary file '%s' does not exist" % sfpath)
215 primary["secondaryFiles"] = cmap(found)
216 if discovered is not None:
217 discovered[primary["location"]] = primary["secondaryFiles"]
218 elif inputschema["type"] not in primitive_types_set:
219 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222 for inputschema in inputs:
223 primary = job_order.get(shortname(inputschema["id"]))
224 if isinstance(primary, (Mapping, Sequence)):
225 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
227 def upload_dependencies(arvrunner, name, document_loader,
228 workflowobj, uri, loadref_run,
229 include_primary=True, discovered_secondaryfiles=None):
230 """Upload the dependencies of the workflowobj document to Keep.
232 Returns a pathmapper object mapping local paths to keep references. Also
233 does an in-place update of references in "workflowobj".
235 Use scandeps to find $import, $include, $schemas, run, File and Directory
236 fields that represent external references.
238 If workflowobj has an "id" field, this will reload the document to ensure
239 it is scanning the raw document prior to preprocessing.
244 joined = document_loader.fetcher.urljoin(b, u)
245 defrg, _ = urllib.parse.urldefrag(joined)
246 if defrg not in loaded:
248 # Use fetch_text to get raw file (before preprocessing).
249 text = document_loader.fetch_text(defrg)
250 if isinstance(text, bytes):
251 textIO = StringIO(text.decode('utf-8'))
253 textIO = StringIO(text)
254 return yaml.safe_load(textIO)
259 loadref_fields = set(("$import", "run"))
261 loadref_fields = set(("$import",))
263 scanobj = workflowobj
264 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265 # Need raw file content (before preprocessing) to ensure
266 # that external references in $include and $mixin are captured.
267 scanobj = loadref("", workflowobj["id"])
271 sc_result = scandeps(uri, scanobj,
273 set(("$include", "$schemas", "location")),
274 loadref, urljoin=document_loader.fetcher.urljoin)
279 def collect_uuids(obj):
280 loc = obj.get("location", "")
283 # Collect collection uuids that need to be resolved to
284 # portable data hashes
285 gp = collection_uuid_pattern.match(loc)
287 uuids[gp.groups()[0]] = obj
288 if collectionUUID in obj:
289 uuids[obj[collectionUUID]] = obj
291 def collect_uploads(obj):
292 loc = obj.get("location", "")
296 if sp[0] in ("file", "http", "https"):
297 # Record local files than need to be uploaded,
298 # don't include file literals, keep references, etc.
302 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303 visit_class(sc_result, ("File", "Directory"), collect_uploads)
305 # Resolve any collection uuids we found to portable data hashes
306 # and assign them to uuid_map
308 fetch_uuids = list(uuids.keys())
310 # For a large number of fetch_uuids, API server may limit
311 # response size, so keep fetching from API server has nothing
313 lookups = arvrunner.api.collections().list(
314 filters=[["uuid", "in", fetch_uuids]],
316 select=["uuid", "portable_data_hash"]).execute(
317 num_retries=arvrunner.num_retries)
319 if not lookups["items"]:
322 for l in lookups["items"]:
323 uuid_map[l["uuid"]] = l["portable_data_hash"]
325 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
327 normalizeFilesDirs(sc)
329 if include_primary and "id" in workflowobj:
330 sc.append({"class": "File", "location": workflowobj["id"]})
332 if "$schemas" in workflowobj:
333 for s in workflowobj["$schemas"]:
334 sc.append({"class": "File", "location": s})
336 def visit_default(obj):
338 def ensure_default_location(f):
339 if "location" not in f and "path" in f:
340 f["location"] = f["path"]
342 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343 # Doesn't exist, remove from list of dependencies to upload
344 sc[:] = [x for x in sc if x["location"] != f["location"]]
345 # Delete "default" from workflowobj
347 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
351 find_defaults(workflowobj, visit_default)
354 def discover_default_secondary_files(obj):
355 builder_job_order = {}
356 for t in obj["inputs"]:
357 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358 # Need to create a builder object to evaluate expressions.
359 builder = make_builder(builder_job_order,
360 obj.get("hints", []),
361 obj.get("requirements", []),
364 discover_secondary_files(arvrunner.fs_access,
370 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
373 for d in list(discovered):
374 # Only interested in discovered secondaryFiles which are local
375 # files that need to be uploaded.
376 if d.startswith("file:"):
377 sc.extend(discovered[d])
381 mapper = ArvPathMapper(arvrunner, sc, "",
385 single_collection=True)
388 loc = p.get("location")
389 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390 p["location"] = mapper.mapper(p["location"]).resolved
396 if collectionUUID in p:
397 uuid = p[collectionUUID]
398 if uuid not in uuid_map:
399 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400 "Collection uuid %s not found" % uuid)
401 gp = collection_pdh_pattern.match(loc)
402 if gp and uuid_map[uuid] != gp.groups()[0]:
403 # This file entry has both collectionUUID and a PDH
404 # location. If the PDH doesn't match the one returned
405 # the API server, raise an error.
406 raise SourceLine(p, "location", validate.ValidationException).makeError(
407 "Expected collection uuid %s to be %s but API server reported %s" % (
408 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
410 gp = collection_uuid_pattern.match(loc)
413 uuid = gp.groups()[0]
414 if uuid not in uuid_map:
415 raise SourceLine(p, "location", validate.ValidationException).makeError(
416 "Collection uuid %s not found" % uuid)
417 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418 p[collectionUUID] = uuid
420 visit_class(workflowobj, ("File", "Directory"), setloc)
421 visit_class(discovered, ("File", "Directory"), setloc)
423 if discovered_secondaryfiles is not None:
425 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
427 if "$schemas" in workflowobj:
429 for s in workflowobj["$schemas"]:
431 sch.append(mapper.mapper(s).resolved)
432 workflowobj["$schemas"] = sch
437 def upload_docker(arvrunner, tool):
438 """Uploads Docker images used in CommandLineTool objects."""
440 if isinstance(tool, CommandLineTool):
441 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
443 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444 # TODO: can be supported by containers API, but not jobs API.
445 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
448 arvrunner.runtimeContext.force_docker_pull,
449 arvrunner.runtimeContext.tmp_outdir_prefix)
451 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
452 True, arvrunner.project_uuid,
453 arvrunner.runtimeContext.force_docker_pull,
454 arvrunner.runtimeContext.tmp_outdir_prefix)
455 elif isinstance(tool, cwltool.workflow.Workflow):
457 upload_docker(arvrunner, s.embedded_tool)
460 def packed_workflow(arvrunner, tool, merged_map):
461 """Create a packed workflow.
463 A "packed" workflow is one where all the components have been combined into a single document."""
466 packed = pack(arvrunner.loadingContext, tool.tool["id"],
467 rewrite_out=rewrites,
468 loader=tool.doc_loader)
470 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
472 def visit(v, cur_id):
473 if isinstance(v, dict):
474 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
475 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
476 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
478 cur_id = rewrite_to_orig.get(v["id"], v["id"])
479 if "path" in v and "location" not in v:
480 v["location"] = v["path"]
482 if "location" in v and cur_id in merged_map:
483 if v["location"] in merged_map[cur_id].resolved:
484 v["location"] = merged_map[cur_id].resolved[v["location"]]
485 if v["location"] in merged_map[cur_id].secondaryFiles:
486 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
487 if v.get("class") == "DockerRequirement":
488 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
489 arvrunner.project_uuid,
490 arvrunner.runtimeContext.force_docker_pull,
491 arvrunner.runtimeContext.tmp_outdir_prefix)
494 if isinstance(v, list):
501 def tag_git_version(packed):
502 if tool.tool["id"].startswith("file://"):
503 path = os.path.dirname(tool.tool["id"][7:])
505 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
506 except (OSError, subprocess.CalledProcessError):
509 packed["http://schema.org/version"] = githash
512 def upload_job_order(arvrunner, name, tool, job_order):
513 """Upload local files referenced in the input object and return updated input
514 object with 'location' updated to the proper keep references.
517 # Make a copy of the job order and set defaults.
518 builder_job_order = copy.copy(job_order)
520 # fill_in_defaults throws an error if there are any
521 # missing required parameters, we don't want it to do that
522 # so make them all optional.
523 inputs_copy = copy.deepcopy(tool.tool["inputs"])
524 for i in inputs_copy:
525 if "null" not in i["type"]:
526 i["type"] = ["null"] + aslist(i["type"])
528 fill_in_defaults(inputs_copy,
531 # Need to create a builder object to evaluate expressions.
532 builder = make_builder(builder_job_order,
537 # Now update job_order with secondaryFiles
538 discover_secondary_files(arvrunner.fs_access,
543 jobmapper = upload_dependencies(arvrunner,
547 job_order.get("id", "#"),
550 if "id" in job_order:
553 # Need to filter this out, gets added by cwltool when providing
554 # parameters on the command line.
555 if "job_order" in job_order:
556 del job_order["job_order"]
560 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
562 def upload_workflow_deps(arvrunner, tool):
563 # Ensure that Docker images needed by this workflow are available
565 upload_docker(arvrunner, tool)
567 document_loader = tool.doc_loader
571 def upload_tool_deps(deptool):
573 discovered_secondaryfiles = {}
574 pm = upload_dependencies(arvrunner,
575 "%s dependencies" % (shortname(deptool["id"])),
580 include_primary=False,
581 discovered_secondaryfiles=discovered_secondaryfiles)
582 document_loader.idx[deptool["id"]] = deptool
584 for k,v in pm.items():
585 toolmap[k] = v.resolved
586 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
588 tool.visit(upload_tool_deps)
592 def arvados_jobs_image(arvrunner, img):
593 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
596 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
597 arvrunner.runtimeContext.force_docker_pull,
598 arvrunner.runtimeContext.tmp_outdir_prefix)
599 except Exception as e:
600 raise Exception("Docker image %s is not available\n%s" % (img, e) )
603 def upload_workflow_collection(arvrunner, name, packed):
604 collection = arvados.collection.Collection(api_client=arvrunner.api,
605 keep_client=arvrunner.keep_client,
606 num_retries=arvrunner.num_retries)
607 with collection.open("workflow.cwl", "w") as f:
608 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
610 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
611 ["name", "like", name+"%"]]
612 if arvrunner.project_uuid:
613 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
614 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
617 logger.info("Using collection %s", exists["items"][0]["uuid"])
619 collection.save_new(name=name,
620 owner_uuid=arvrunner.project_uuid,
621 ensure_unique_name=True,
622 num_retries=arvrunner.num_retries)
623 logger.info("Uploaded to %s", collection.manifest_locator())
625 return collection.portable_data_hash()
628 class Runner(Process):
629 """Base class for runner processes, which submit an instance of
630 arvados-cwl-runner and wait for the final result."""
632 def __init__(self, runner, updated_tool,
633 tool, loadingContext, enable_reuse,
634 output_name, output_tags, submit_runner_ram=0,
635 name=None, on_error=None, submit_runner_image=None,
636 intermediate_output_ttl=0, merged_map=None,
637 priority=None, secret_store=None,
638 collection_cache_size=256,
639 collection_cache_is_default=True):
641 loadingContext = loadingContext.copy()
642 loadingContext.metadata = updated_tool.metadata.copy()
644 super(Runner, self).__init__(updated_tool.tool, loadingContext)
646 self.arvrunner = runner
647 self.embedded_tool = tool
648 self.job_order = None
651 # If reuse is permitted by command line arguments but
652 # disabled by the workflow itself, disable it.
653 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
655 enable_reuse = reuse_req["enableReuse"]
656 self.enable_reuse = enable_reuse
658 self.final_output = None
659 self.output_name = output_name
660 self.output_tags = output_tags
662 self.on_error = on_error
663 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
664 self.intermediate_output_ttl = intermediate_output_ttl
665 self.priority = priority
666 self.secret_store = secret_store
667 self.enable_dev = loadingContext.enable_dev
669 self.submit_runner_cores = 1
670 self.submit_runner_ram = 1024 # defaut 1 GiB
671 self.collection_cache_size = collection_cache_size
673 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
674 if runner_resource_req:
675 if runner_resource_req.get("coresMin"):
676 self.submit_runner_cores = runner_resource_req["coresMin"]
677 if runner_resource_req.get("ramMin"):
678 self.submit_runner_ram = runner_resource_req["ramMin"]
679 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
680 self.collection_cache_size = runner_resource_req["keep_cache"]
682 if submit_runner_ram:
683 # Command line / initializer overrides default and/or spec from workflow
684 self.submit_runner_ram = submit_runner_ram
686 if self.submit_runner_ram <= 0:
687 raise Exception("Value of submit-runner-ram must be greater than zero")
689 if self.submit_runner_cores <= 0:
690 raise Exception("Value of submit-runner-cores must be greater than zero")
692 self.merged_map = merged_map or {}
695 job_order, # type: Mapping[Text, Text]
696 output_callbacks, # type: Callable[[Any, Any], Any]
697 runtimeContext # type: RuntimeContext
698 ): # type: (...) -> Generator[Any, None, None]
699 self.job_order = job_order
700 self._init_job(job_order, runtimeContext)
703 def update_pipeline_component(self, record):
706 def done(self, record):
707 """Base method for handling a completed runner."""
710 if record["state"] == "Complete":
711 if record.get("exit_code") is not None:
712 if record["exit_code"] == 33:
713 processStatus = "UnsupportedRequirement"
714 elif record["exit_code"] == 0:
715 processStatus = "success"
717 processStatus = "permanentFail"
719 processStatus = "success"
721 processStatus = "permanentFail"
725 if processStatus == "permanentFail":
726 logc = arvados.collection.CollectionReader(record["log"],
727 api_client=self.arvrunner.api,
728 keep_client=self.arvrunner.keep_client,
729 num_retries=self.arvrunner.num_retries)
730 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
732 self.final_output = record["output"]
733 outc = arvados.collection.CollectionReader(self.final_output,
734 api_client=self.arvrunner.api,
735 keep_client=self.arvrunner.keep_client,
736 num_retries=self.arvrunner.num_retries)
737 if "cwl.output.json" in outc:
738 with outc.open("cwl.output.json", "rb") as f:
740 outputs = json.loads(f.read().decode())
741 def keepify(fileobj):
742 path = fileobj["location"]
743 if not path.startswith("keep:"):
744 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
745 adjustFileObjs(outputs, keepify)
746 adjustDirObjs(outputs, keepify)
748 logger.exception("[%s] While getting final output object", self.name)
749 self.arvrunner.output_callback({}, "permanentFail")
751 self.arvrunner.output_callback(outputs, processStatus)