X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cb690390d4f253c3bbb9c543e243cf988f39fbb3..HEAD:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 84b98378f4..a340f30e95 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -2,10 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from future import standard_library -standard_library.install_aliases() -from builtins import str - import logging import json import os @@ -27,6 +23,9 @@ from cwltool.job import JobBase import arvados.collection +import crunchstat_summary.summarizer +import crunchstat_summary.reader + from .arvdocker import arv_docker_get_image from . import done from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder @@ -44,7 +43,7 @@ def cleanup_name_for_collection(name): class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner, job_runtime, + def __init__(self, runner, job_runtime, globpatterns, builder, # type: Builder joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] make_path_mapper, # type: Callable[..., PathMapper] @@ -58,6 +57,7 @@ class ArvadosContainer(JobBase): self.running = False self.uuid = None self.attempt_count = 0 + self.globpatterns = globpatterns def update_pipeline_component(self, r): pass @@ -367,11 +367,71 @@ class ArvadosContainer(JobBase): logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") + if self.arvrunner.api._rootDesc["revision"] >= "20240502" and self.globpatterns: + output_glob = [] + for gb in self.globpatterns: + gb = self.builder.do_eval(gb) + if not gb: + continue + for gbeval in aslist(gb): + if gbeval.startswith(self.outdir+"/"): + gbeval = gbeval[len(self.outdir)+1:] + while gbeval.startswith("./"): + gbeval = gbeval[2:] + + if gbeval in (self.outdir, "", "."): + output_glob.append("**") + elif gbeval.endswith("/"): + output_glob.append(gbeval+"**") + else: + output_glob.append(gbeval) + output_glob.append(gbeval + "/**") + + if "**" in output_glob: + # if it's going to match all, prefer not to provide it + # at all. + output_glob.clear() + + if output_glob: + # Tools should either use cwl.output.json or + # outputBinding globs. However, one CWL conformance + # test has both, so we need to make sure we collect + # cwl.output.json in this case. That test uses + # cwl.output.json return a string, but also uses + # outputBinding. + output_glob.append("cwl.output.json") + + # It could happen that a tool creates cwl.output.json, + # references a file, but also uses a outputBinding + # glob that doesn't include the file being referenced. + # + # In this situation, output_glob will only match the + # pattern we know about. If cwl.output.json referred + # to other files in the output, those would be + # missing. We could upload the entire output, but we + # currently have no way of knowing at this point + # whether cwl.output.json will be used this way. + # + # Because this is a corner case, I'm inclined to leave + # this as a known issue for now. No conformance tests + # do this and I'd even be inclined to have it ruled + # incompatible in the CWL spec if it did come up. + # That said, in retrospect it would have been good to + # require CommandLineTool to declare when it expects + # cwl.output.json. + + container_request["output_glob"] = output_glob + ram_multiplier = [1] oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry") - if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'): - ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler')) + if oom_retry_req: + if oom_retry_req.get('memoryRetryMultiplier'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier')) + elif oom_retry_req.get('memoryRetryMultipler'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler')) + else: + ram_multiplier.append(2) if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] @@ -492,11 +552,14 @@ class ArvadosContainer(JobBase): else: processStatus = "permanentFail" - if processStatus == "permanentFail" and record["log_uuid"]: - logc = arvados.collection.CollectionReader(record["log_uuid"], - api_client=self.arvrunner.api, - keep_client=self.arvrunner.keep_client, - num_retries=self.arvrunner.num_retries) + logc = None + if record["log_uuid"]: + logc = arvados.collection.Collection(record["log_uuid"], + api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + + if processStatus == "permanentFail" and logc is not None: label = self.arvrunner.label(self) done.logtail( logc, logger.error, @@ -522,6 +585,28 @@ class ArvadosContainer(JobBase): uuid=self.uuid, body={"container_request": {"properties": properties}} ).execute(num_retries=self.arvrunner.num_retries) + + if logc is not None and self.job_runtime.enable_usage_report is not False: + try: + summarizer = crunchstat_summary.summarizer.ContainerRequestSummarizer( + record, + collection_object=logc, + label=self.name, + arv=self.arvrunner.api) + summarizer.run() + with logc.open("usage_report.html", "wt") as mr: + mr.write(summarizer.html_report()) + logc.save() + + # Post warnings about nodes that are under-utilized. + for rc in summarizer._recommend_gen(lambda x: x): + self.job_runtime.usage_report_notes.append(rc) + + except Exception as e: + logger.warning("%s unable to generate resource usage report", + self.arvrunner.label(self), + exc_info=(e if self.arvrunner.debug else False)) + except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -699,6 +784,12 @@ class RunnerContainer(Runner): if runtimeContext.prefer_cached_downloads: command.append("--prefer-cached-downloads") + if runtimeContext.enable_usage_report is True: + command.append("--enable-usage-report") + + if runtimeContext.enable_usage_report is False: + command.append("--disable-usage-report") + if self.fast_parser: command.append("--fast-parser")