from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
+ workflowfiles = []
+ jobfiles = []
+ workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
- files.add(path)
- return path
+ files.append(path)
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
loaded = set()
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
- set(("$include", "$schemas", "path")),
+ set(("$include", "$schemas", "path", "location")),
loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+ adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+ def setloc(p):
+ p["location"] = jobmapper.mapper(p["location"])[1]
+ adjustFileObjs(self.job_order, setloc)
if "id" in self.job_order:
del self.job_order["id"]
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.jobs[record["uuid"]]
+ del self.arvrunner.processes[record["uuid"]]