X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/606c29cdbb9012ee99043a2da9f28b2cd302e5e4..cb97316068f201ffd03f54d67074a83601c2bb45:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 0bec692643..4953afa797 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -14,9 +14,10 @@ import uuid import ruamel.yaml as yaml from cwltool.errors import WorkflowException -from cwltool.process import get_feature, UnsupportedRequirement, shortname +from cwltool.process import UnsupportedRequirement, shortname from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist +from cwltool.job import JobBase import arvados.collection @@ -30,10 +31,18 @@ from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') -class ArvadosContainer(object): +class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner): + def __init__(self, runner, + builder, # type: Builder + joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] + make_path_mapper, # type: Callable[..., PathMapper] + requirements, # type: List[Dict[Text, Text]] + hints, # type: List[Dict[Text, Text]] + name # type: Text + ): + super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name) self.arvrunner = runner self.running = False self.uuid = None @@ -41,7 +50,7 @@ class ArvadosContainer(object): def update_pipeline_component(self, r): pass - def run(self, dry_run=False, pull_image=True, **kwargs): + def run(self, runtimeContext): # ArvadosCommandTool subclasses from cwltool.CommandLineTool, # which calls makeJobRunner() to get a new ArvadosContainer # object. The fields that define execution such as @@ -54,7 +63,7 @@ class ArvadosContainer(object): "name": self.name, "output_path": self.outdir, "cwd": self.outdir, - "priority": kwargs.get("priority"), + "priority": runtimeContext.priority, "state": "Committed", "properties": {}, } @@ -190,20 +199,20 @@ class ArvadosContainer(object): mounts["stdout"] = {"kind": "file", "path": "%s/%s" % (self.outdir, self.stdout)} - (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") + (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if not docker_req: docker_req = {"dockerImageId": "arvados/jobs"} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, - pull_image, + runtimeContext.pull_image, self.arvrunner.project_uuid) - api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement") + api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: runtime_constraints["API"] = True - runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints") + runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20 @@ -217,11 +226,11 @@ class ArvadosContainer(object): "writable": True } - partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement") + partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement") if partition_req: scheduling_parameters["partitions"] = aslist(partition_req["partition"]) - intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput") + intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput") if intermediate_output_req: self.output_ttl = intermediate_output_req["outputTTL"] else: @@ -236,15 +245,15 @@ class ArvadosContainer(object): container_request["runtime_constraints"] = runtime_constraints container_request["scheduling_parameters"] = scheduling_parameters - enable_reuse = kwargs.get("enable_reuse", True) + enable_reuse = runtimeContext.enable_reuse if enable_reuse: - reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement") + reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] container_request["use_existing"] = enable_reuse - if kwargs.get("runnerjob", "").startswith("arvwf:"): - wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")] + if runtimeContext.runnerjob.startswith("arvwf:"): + wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) if container_request["name"] == "main": container_request["name"] = wfrecord["name"] @@ -253,9 +262,9 @@ class ArvadosContainer(object): self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) try: - if kwargs.get("submit_request_uuid"): + if runtimeContext.submit_request_uuid: response = self.arvrunner.api.container_requests().update( - uuid=kwargs["submit_request_uuid"], + uuid=runtimeContext.submit_request_uuid, body=container_request ).execute(num_retries=self.arvrunner.num_retries) else: @@ -329,7 +338,7 @@ class ArvadosContainer(object): class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + def arvados_job_spec(self, runtimeContext): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -424,9 +433,12 @@ class RunnerContainer(Runner): if self.output_tags: command.append("--output-tags=" + self.output_tags) - if kwargs.get("debug"): + if runtimeContext.debug: command.append("--debug") + if kwargs.get("storage_classes") and kwargs.get("storage_classes") != self.default_storage_classes: + command.append("--storage-classes=" + kwargs.get("storage_classes")) + if self.on_error: command.append("--on-error=" + self.on_error) @@ -446,15 +458,15 @@ class RunnerContainer(Runner): return container_req - def run(self, **kwargs): - kwargs["keepprefix"] = "keep:" - job_spec = self.arvados_job_spec(**kwargs) + def run(self, runtimeContext): + runtimeContext.keepprefix = "keep:" + job_spec = self.arvados_job_spec(runtimeContext) if self.arvrunner.project_uuid: job_spec["owner_uuid"] = self.arvrunner.project_uuid - if kwargs.get("submit_request_uuid"): + if runtimeContext.submit_request_uuid: response = self.arvrunner.api.container_requests().update( - uuid=kwargs["submit_request_uuid"], + uuid=runtimeContext.submit_request_uuid, body=job_spec ).execute(num_retries=self.arvrunner.num_retries) else: