3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
22 logger = logging.getLogger('arvados.cwl-runner')
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
26 def upload_dependencies(arvrunner, name, document_loader,
27 workflowobj, uri, loadref_run):
30 joined = urlparse.urljoin(b, u)
31 defrg, _ = urlparse.urldefrag(joined)
32 if defrg not in loaded:
34 # Use fetch_text to get raw file (before preprocessing).
35 text = document_loader.fetch_text(defrg)
36 if isinstance(text, bytes):
37 textIO = StringIO(text.decode('utf-8'))
39 textIO = StringIO(text)
40 return yaml.safe_load(textIO)
45 loadref_fields = set(("$import", "run"))
47 loadref_fields = set(("$import",))
50 if "id" in workflowobj:
51 # Need raw file content (before preprocessing) to ensure
52 # that external references in $include and $mixin are captured.
53 scanobj = loadref("", workflowobj["id"])
55 sc = scandeps(uri, scanobj,
57 set(("$include", "$schemas")),
64 adjustFileObjs(sc, visitFiles)
65 adjustDirObjs(sc, visitFiles)
67 normalizeFilesDirs(files)
69 if "id" in workflowobj:
70 files.append({"class": "File", "location": workflowobj["id"]})
72 mapper = ArvPathMapper(arvrunner, files, "",
78 p["location"] = mapper.mapper(p["location"]).target
79 adjustFileObjs(workflowobj, setloc)
80 adjustDirObjs(workflowobj, setloc)
85 def upload_docker(arvrunner, tool):
86 if isinstance(tool, CommandLineTool):
87 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
89 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
90 elif isinstance(tool, cwltool.workflow.Workflow):
92 upload_docker(arvrunner, s.embedded_tool)
96 def __init__(self, runner, tool, job_order, enable_reuse):
97 self.arvrunner = runner
99 self.job_order = job_order
101 self.enable_reuse = enable_reuse
104 def update_pipeline_component(self, record):
107 def arvados_job_spec(self, *args, **kwargs):
108 upload_docker(self.arvrunner, self.tool)
110 self.name = os.path.basename(self.tool.tool["id"])
112 workflowmapper = upload_dependencies(self.arvrunner,
114 self.tool.doc_loader,
116 self.tool.tool["id"],
119 jobmapper = upload_dependencies(self.arvrunner,
120 os.path.basename(self.job_order.get("id", "#")),
121 self.tool.doc_loader,
123 self.job_order.get("id", "#"),
126 if "id" in self.job_order:
127 del self.job_order["id"]
129 return workflowmapper
132 def done(self, record):
133 if record["state"] == "Complete":
134 if record.get("exit_code") is not None:
135 if record["exit_code"] == 33:
136 processStatus = "UnsupportedRequirement"
137 elif record["exit_code"] == 0:
138 processStatus = "success"
140 processStatus = "permanentFail"
142 processStatus = "success"
144 processStatus = "permanentFail"
149 outc = arvados.collection.Collection(record["output"])
150 with outc.open("cwl.output.json") as f:
151 outputs = json.load(f)
152 def keepify(fileobj):
153 path = fileobj["location"]
154 if not path.startswith("keep:"):
155 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
156 adjustFileObjs(outputs, keepify)
157 adjustDirObjs(outputs, keepify)
158 except Exception as e:
159 logger.error("While getting final output object: %s", e)
160 self.arvrunner.output_callback(outputs, processStatus)
162 del self.arvrunner.processes[record["uuid"]]