from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
+ workflowfiles = []
+ jobfiles = []
+ workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
- files.add(path)
- return path
+ files.append(path)
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
loaded = set()
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
- set(("$include", "$schemas", "path")),
+ set(("$include", "$schemas", "path", "location")),
loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+ adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
+ adjustDirObjs(sc, partial(visitFiles, workflowfiles))
+ adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
+
+ normalizeFilesDirs(jobfiles)
+ normalizeFilesDirs(workflowfiles)
keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+ def setloc(p):
+ p["location"] = jobmapper.mapper(p["location"])[1]
+ adjustFileObjs(self.job_order, setloc)
+ adjustDirObjs(self.job_order, setloc)
if "id" in self.job_order:
del self.job_order["id"]
outc = arvados.collection.Collection(record["output"])
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
- def keepify(path):
+ def keepify(fileobj):
+ path = fileobj["location"]
if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- else:
- return path
- adjustFiles(outputs, keepify)
+ fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+ adjustFileObjs(outputs, keepify)
+ adjustDirObjs(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.jobs[record["uuid"]]
+ del self.arvrunner.processes[record["uuid"]]