3 from functools import partial
8 import cwltool.draft2tool
9 from cwltool.draft2tool import CommandLineTool
10 import cwltool.workflow
11 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
12 from cwltool.load_tool import fetch_document
13 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 import arvados.collection
17 from .arvdocker import arv_docker_get_image
18 from .pathmapper import ArvPathMapper
20 logger = logging.getLogger('arvados.cwl-runner')
22 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25 def __init__(self, runner, tool, job_order, enable_reuse):
26 self.arvrunner = runner
28 self.job_order = job_order
30 self.enable_reuse = enable_reuse
33 def update_pipeline_component(self, record):
36 def upload_docker(self, tool):
37 if isinstance(tool, CommandLineTool):
38 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
40 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
41 elif isinstance(tool, cwltool.workflow.Workflow):
43 self.upload_docker(s.embedded_tool)
46 def arvados_job_spec(self, *args, **kwargs):
47 self.upload_docker(self.tool)
51 workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
53 self.name = os.path.basename(self.tool.tool["id"])
55 def visitFiles(files, path):
58 document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
61 joined = urlparse.urljoin(b, u)
62 if joined not in loaded:
64 return document_loader.fetch(urlparse.urljoin(b, u))
68 sc = scandeps(uri, workflowobj,
69 set(("$import", "run")),
70 set(("$include", "$schemas", "path", "location")),
72 adjustFileObjs(sc, partial(visitFiles, workflowfiles))
73 adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
74 adjustDirObjs(sc, partial(visitFiles, workflowfiles))
75 adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
77 normalizeFilesDirs(jobfiles)
78 normalizeFilesDirs(workflowfiles)
80 keepprefix = kwargs.get("keepprefix", "")
81 workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
87 jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
90 name=os.path.basename(self.job_order.get("id", "#")),
94 p["location"] = jobmapper.mapper(p["location"])[1]
95 adjustFileObjs(self.job_order, setloc)
96 adjustDirObjs(self.job_order, setloc)
98 if "id" in self.job_order:
99 del self.job_order["id"]
101 return workflowmapper
104 def done(self, record):
105 if record["state"] == "Complete":
106 if record.get("exit_code") is not None:
107 if record["exit_code"] == 33:
108 processStatus = "UnsupportedRequirement"
109 elif record["exit_code"] == 0:
110 processStatus = "success"
112 processStatus = "permanentFail"
114 processStatus = "success"
116 processStatus = "permanentFail"
121 outc = arvados.collection.Collection(record["output"])
122 with outc.open("cwl.output.json") as f:
123 outputs = json.load(f)
124 def keepify(fileobj):
125 path = fileobj["location"]
126 if not path.startswith("keep:"):
127 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
128 adjustFileObjs(outputs, keepify)
129 adjustDirObjs(outputs, keepify)
130 except Exception as e:
131 logger.error("While getting final output object: %s", e)
132 self.arvrunner.output_callback(outputs, processStatus)
134 del self.arvrunner.processes[record["uuid"]]