from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
+ self.uuid = None
def update_pipeline_component(self, record):
pass
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
+ workflowfiles = []
+ jobfiles = []
+ workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
- files.add(path)
- return path
+ files.append(path)
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+ loaded = set()
def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
+ joined = urlparse.urljoin(b, u)
+ if joined not in loaded:
+ loaded.add(joined)
+ return document_loader.fetch(urlparse.urljoin(b, u))
+ else:
+ return {}
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
- set(("$include", "$schemas", "path")),
+ set(("$include", "$schemas", "path", "location")),
loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+ adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
+ keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=self.name,
**kwargs)
jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+ def setloc(p):
+ p["location"] = jobmapper.mapper(p["location"])[1]
+ adjustFileObjs(self.job_order, setloc)
if "id" in self.job_order:
del self.job_order["id"]
def done(self, record):
if record["state"] == "Complete":
- processStatus = "success"
+ if record.get("exit_code") is not None:
+ if record["exit_code"] == 33:
+ processStatus = "UnsupportedRequirement"
+ elif record["exit_code"] == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+ else:
+ processStatus = "success"
else:
processStatus = "permanentFail"
def keepify(path):
if not path.startswith("keep:"):
return "keep:%s/%s" % (record["output"], path)
+ else:
+ return path
adjustFiles(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.jobs[record["uuid"]]
+ del self.arvrunner.processes[record["uuid"]]