import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
import arvados.collection
import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
+from ._version import __version__
logger = logging.getLogger('arvados.cwl-runner')
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+ """Remove 'listing' field from Directory objects that are keep references.
+
+ When Directory objects represent Keep references, it redundant and
+ potentially very expensive to pass fully enumerated Directory objects
+ between instances of cwl-runner (e.g. a submitting a job, or using the
+ RunInSingleContainer feature), so delete the 'listing' field when it is
+ safe to do so.
+ """
-def del_listing(obj):
if obj.get("location", "").startswith("keep:") and "listing" in obj:
del obj["listing"]
if obj.get("location", "").startswith("_:"):
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
+ if docker_req.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
+def upload_instance(arvrunner, name, tool, job_order):
+ upload_docker(arvrunner, tool)
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ workflowmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ True)
+ jobmapper = upload_dependencies(arvrunner,
+ os.path.basename(job_order.get("id", "#")),
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ return workflowmapper
+
+def arvados_jobs_image(arvrunner):
+ img = "arvados/jobs:"+__version__
+ try:
+ arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ except Exception as e:
+ raise Exception("Docker image %s is not available\n%s" % (img, e) )
+ return img
class Runner(object):
- def __init__(self, runner, tool, job_order, enable_reuse):
+ def __init__(self, runner, tool, job_order, enable_reuse,
+ output_name, output_tags, submit_runner_ram=0,
+ name=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
self.uuid = None
+ self.final_output = None
+ self.output_name = output_name
+ self.output_tags = output_tags
+ self.name = name
+
+ if submit_runner_ram:
+ self.submit_runner_ram = submit_runner_ram
+ else:
+ self.submit_runner_ram = 1024
+
+ if self.submit_runner_ram <= 0:
+ raise Exception("Value of --submit-runner-ram must be greater than zero")
def update_pipeline_component(self, record):
pass
def arvados_job_spec(self, *args, **kwargs):
- upload_docker(self.arvrunner, self.tool)
-
- self.name = os.path.basename(self.tool.tool["id"])
-
- workflowmapper = upload_dependencies(self.arvrunner,
- self.name,
- self.tool.doc_loader,
- self.tool.tool,
- self.tool.tool["id"],
- True)
-
- jobmapper = upload_dependencies(self.arvrunner,
- os.path.basename(self.job_order.get("id", "#")),
- self.tool.doc_loader,
- self.job_order,
- self.job_order.get("id", "#"),
- False)
-
- adjustDirObjs(self.job_order, del_listing)
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
+ if self.name is None:
+ self.name = os.path.basename(self.tool.tool["id"])
+ workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+ adjustDirObjs(self.job_order, trim_listing)
return workflowmapper
-
def done(self, record):
if record["state"] == "Complete":
if record.get("exit_code") is not None:
else:
processStatus = "permanentFail"
- outputs = None
+ outputs = {}
try:
try:
- outc = arvados.collection.Collection(record["output"])
- with outc.open("cwl.output.json") as f:
- outputs = json.load(f)
+ self.final_output = record["output"]
+ outc = arvados.collection.CollectionReader(self.final_output,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ if "cwl.output.json" in outc:
+ with outc.open("cwl.output.json") as f:
+ if f.size() > 0:
+ outputs = json.load(f)
def keepify(fileobj):
path = fileobj["location"]
if not path.startswith("keep:"):
adjustFileObjs(outputs, keepify)
adjustDirObjs(outputs, keepify)
except Exception as e:
- logger.error("While getting final output object: %s", e)
+ logger.exception("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.processes[record["uuid"]]
+ if record["uuid"] in self.arvrunner.processes:
+ del self.arvrunner.processes[record["uuid"]]