import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.process import UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
+from cwltool.job import JobBase
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-class ArvadosContainer(object):
+class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
def update_pipeline_component(self, r):
pass
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
+ # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
+ # which calls makeJobRunner() to get a new ArvadosContainer
+ # object. The fields that define execution such as
+ # command_line, environment, etc are set on the
+ # ArvadosContainer object by CommandLineTool.job() before
+ # run() is called.
+
container_request = {
"command": self.command_line,
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
- "priority": kwargs.get("priority"),
+ "priority": runtimeContext.priority,
"state": "Committed",
"properties": {},
}
runtime_constraints = {}
+ if self.arvrunner.project_uuid:
+ container_request["owner_uuid"] = self.arvrunner.project_uuid
+
if self.arvrunner.secret_store.has_secret(self.command_line):
raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
separateDirs=False)
- logger.debug("generatemapper is %s", generatemapper._pathmap)
+ sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
+
+ logger.debug("generatemapper is %s", sorteditems)
with Perf(metrics, "createfiles %s" % self.name):
- for f, p in generatemapper.items():
+ for f, p in sorteditems:
if not p.target:
pass
elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
with Perf(metrics, "generatefiles.save_new %s" % self.name):
vwd.save_new()
- for f, p in generatemapper.items():
- if not p.target or self.arvrunner.secret_store.has_secret(p.resolved):
+ prev = None
+ for f, p in sorteditems:
+ if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
+ (prev is not None and p.target.startswith(prev))):
continue
mountpoint = "%s/%s" % (self.outdir, p.target)
mounts[mountpoint] = {"kind": "collection",
"path": p.target}
if p.type.startswith("Writable"):
mounts[mountpoint]["writable"] = True
+ prev = p.target + "/"
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
mounts["stdout"] = {"kind": "file",
"path": "%s/%s" % (self.outdir, self.stdout)}
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if not docker_req:
docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
- pull_image,
+ runtimeContext.pull_image,
self.arvrunner.project_uuid)
- api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+ api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
"writable": True
}
- partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+ partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
if partition_req:
scheduling_parameters["partitions"] = aslist(partition_req["partition"])
- intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+ intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
if intermediate_output_req:
self.output_ttl = intermediate_output_req["outputTTL"]
else:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
- # for testing only!
- mounts.update(secret_mounts)
-
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
container_request["runtime_constraints"] = runtime_constraints
container_request["scheduling_parameters"] = scheduling_parameters
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
- if kwargs.get("runnerjob", "").startswith("arvwf:"):
- wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+ if runtimeContext.runnerjob.startswith("arvwf:"):
+ wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if container_request["name"] == "main":
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
- response = self.arvrunner.api.container_requests().create(
- body=container_request
- ).execute(num_retries=self.arvrunner.num_retries)
+ if runtimeContext.submit_request_uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
- self.done(response)
else:
logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, runtimeContext):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
self.job_order[param] = {"$include": mnt}
container_req = {
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
"properties": {}
}
- # for testing
- container_req["mounts"].update(secret_mounts)
-
if self.tool.tool.get("id", "").startswith("keep:"):
sp = self.tool.tool["id"].split('/')
workflowcollection = sp[0][5:]
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
+ # --local means execute the workflow instead of submitting a container request
+ # --api=containers means use the containers API
+ # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
+ # --disable-validate because we already validated so don't need to do it again
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
if self.output_tags:
command.append("--output-tags=" + self.output_tags)
- if kwargs.get("debug"):
+ if runtimeContext.debug:
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
+ if runtimeContext.storage_classes != "default":
+ command.append("--storage-classes=" + runtimeContext.storage_classes)
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
return container_req
- def run(self, *args, **kwargs):
- kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+ def run(self, runtimeContext):
+ runtimeContext.keepprefix = "keep:"
+ job_spec = self.arvados_job_spec(runtimeContext)
+ if self.arvrunner.project_uuid:
+ job_spec["owner_uuid"] = self.arvrunner.project_uuid
- response = self.arvrunner.api.container_requests().create(
- body=job_spec
- ).execute(num_retries=self.arvrunner.num_retries)
+ if runtimeContext.submit_request_uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
- if response["state"] == "Final":
- self.done(response)
-
def done(self, record):
try:
container = self.arvrunner.api.containers().get(
self.arvrunner.output_callback({}, "permanentFail")
else:
super(RunnerContainer, self).done(container)
- finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]