import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.process import UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
+from cwltool.job import JobBase
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-class ArvadosContainer(object):
+class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
def update_pipeline_component(self, r):
pass
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
# object. The fields that define execution such as
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
- "priority": kwargs.get("priority"),
+ "priority": runtimeContext.priority,
"state": "Committed",
"properties": {},
}
mounts["stdout"] = {"kind": "file",
"path": "%s/%s" % (self.outdir, self.stdout)}
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if not docker_req:
docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
- pull_image,
+ runtimeContext.pull_image,
self.arvrunner.project_uuid)
- api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+ api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
"writable": True
}
- partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+ partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
if partition_req:
scheduling_parameters["partitions"] = aslist(partition_req["partition"])
- intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+ intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
if intermediate_output_req:
self.output_ttl = intermediate_output_req["outputTTL"]
else:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
+ if self.timelimit is not None:
+ scheduling_parameters["max_run_time"] = self.timelimit
+
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
container_request["runtime_constraints"] = runtime_constraints
container_request["scheduling_parameters"] = scheduling_parameters
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
- if kwargs.get("runnerjob", "").startswith("arvwf:"):
- wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+ if runtimeContext.runnerjob.startswith("arvwf:"):
+ wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if container_request["name"] == "main":
container_request["name"] = wfrecord["name"]
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
else:
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, runtimeContext):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
if self.output_tags:
command.append("--output-tags=" + self.output_tags)
- if kwargs.get("debug"):
+ if runtimeContext.debug:
command.append("--debug")
+ if runtimeContext.storage_classes != "default":
+ command.append("--storage-classes=" + runtimeContext.storage_classes)
+
if self.on_error:
command.append("--on-error=" + self.on_error)
return container_req
- def run(self, **kwargs):
- kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(**kwargs)
+ def run(self, runtimeContext):
+ runtimeContext.keepprefix = "keep:"
+ job_spec = self.arvados_job_spec(runtimeContext)
if self.arvrunner.project_uuid:
job_spec["owner_uuid"] = self.arvrunner.project_uuid
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=job_spec
).execute(num_retries=self.arvrunner.num_retries)
else: