3 from functools import partial
8 import cwltool.draft2tool
9 from cwltool.draft2tool import CommandLineTool
10 import cwltool.workflow
11 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
12 from cwltool.load_tool import fetch_document
13 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 import arvados.collection
17 from .arvdocker import arv_docker_get_image
18 from .pathmapper import ArvPathMapper
20 logger = logging.getLogger('arvados.cwl-runner')
22 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
24 def upload_dependencies(arvrunner, name, document_loader,
25 workflowobj, uri, keepprefix, loadref_run):
28 joined = urlparse.urljoin(b, u)
29 if joined not in loaded:
31 return document_loader.fetch(urlparse.urljoin(b, u))
36 loadref_fields = set(("$import", "run"))
38 loadref_fields = set(("$import",))
40 sc = scandeps(uri, workflowobj,
42 set(("$include", "$schemas", "path", "location")),
49 adjustFileObjs(sc, visitFiles)
50 adjustDirObjs(sc, visitFiles)
52 normalizeFilesDirs(files)
54 if "id" in workflowobj:
55 files.append({"class": "File", "location": workflowobj["id"]})
57 mapper = ArvPathMapper(arvrunner, files, "",
63 p["location"] = mapper.mapper(p["location"]).target
64 adjustFileObjs(workflowobj, setloc)
65 adjustDirObjs(workflowobj, setloc)
70 def upload_docker(arvrunner, tool):
71 if isinstance(tool, CommandLineTool):
72 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
74 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
75 elif isinstance(tool, cwltool.workflow.Workflow):
77 upload_docker(arvrunner, s.embedded_tool)
81 def __init__(self, runner, tool, job_order, enable_reuse):
82 self.arvrunner = runner
84 self.job_order = job_order
86 self.enable_reuse = enable_reuse
89 def update_pipeline_component(self, record):
92 def arvados_job_spec(self, *args, **kwargs):
93 upload_docker(self.arvrunner, self.tool)
95 self.name = os.path.basename(self.tool.tool["id"])
97 workflowmapper = upload_dependencies(self.arvrunner,
101 self.tool.tool["id"],
102 kwargs.get("keepprefix", ""),
105 jobmapper = upload_dependencies(self.arvrunner,
106 os.path.basename(self.job_order.get("id", "#")),
107 self.tool.doc_loader,
109 self.job_order.get("id", "#"),
110 kwargs.get("keepprefix", ""),
113 if "id" in self.job_order:
114 del self.job_order["id"]
116 return workflowmapper
119 def done(self, record):
120 if record["state"] == "Complete":
121 if record.get("exit_code") is not None:
122 if record["exit_code"] == 33:
123 processStatus = "UnsupportedRequirement"
124 elif record["exit_code"] == 0:
125 processStatus = "success"
127 processStatus = "permanentFail"
129 processStatus = "success"
131 processStatus = "permanentFail"
136 outc = arvados.collection.Collection(record["output"])
137 with outc.open("cwl.output.json") as f:
138 outputs = json.load(f)
139 def keepify(fileobj):
140 path = fileobj["location"]
141 if not path.startswith("keep:"):
142 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
143 adjustFileObjs(outputs, keepify)
144 adjustDirObjs(outputs, keepify)
145 except Exception as e:
146 logger.error("While getting final output object: %s", e)
147 self.arvrunner.output_callback(outputs, processStatus)
149 del self.arvrunner.processes[record["uuid"]]