3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
22 logger = logging.getLogger('arvados.cwl-runner')
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
27 if obj.get("location", "").startswith("keep:") and "listing" in obj:
29 if obj.get("location", "").startswith("_:"):
32 def upload_dependencies(arvrunner, name, document_loader,
33 workflowobj, uri, loadref_run):
34 """Upload the dependencies of the workflowobj document to Keep.
36 Returns a pathmapper object mapping local paths to keep references. Also
37 does an in-place update of references in "workflowobj".
39 Use scandeps to find $import, $include, $schemas, run, File and Directory
40 fields that represent external references.
42 If workflowobj has an "id" field, this will reload the document to ensure
43 it is scanning the raw document prior to preprocessing.
48 joined = urlparse.urljoin(b, u)
49 defrg, _ = urlparse.urldefrag(joined)
50 if defrg not in loaded:
52 # Use fetch_text to get raw file (before preprocessing).
53 text = document_loader.fetch_text(defrg)
54 if isinstance(text, bytes):
55 textIO = StringIO(text.decode('utf-8'))
57 textIO = StringIO(text)
58 return yaml.safe_load(textIO)
63 loadref_fields = set(("$import", "run"))
65 loadref_fields = set(("$import",))
68 if "id" in workflowobj:
69 # Need raw file content (before preprocessing) to ensure
70 # that external references in $include and $mixin are captured.
71 scanobj = loadref("", workflowobj["id"])
73 sc = scandeps(uri, scanobj,
75 set(("$include", "$schemas", "location")),
78 normalizeFilesDirs(sc)
80 if "id" in workflowobj:
81 sc.append({"class": "File", "location": workflowobj["id"]})
83 mapper = ArvPathMapper(arvrunner, sc, "",
89 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
90 p["location"] = mapper.mapper(p["location"]).resolved
91 adjustFileObjs(workflowobj, setloc)
92 adjustDirObjs(workflowobj, setloc)
97 def upload_docker(arvrunner, tool):
98 if isinstance(tool, CommandLineTool):
99 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
101 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
102 elif isinstance(tool, cwltool.workflow.Workflow):
104 upload_docker(arvrunner, s.embedded_tool)
107 class Runner(object):
108 def __init__(self, runner, tool, job_order, enable_reuse):
109 self.arvrunner = runner
111 self.job_order = job_order
113 self.enable_reuse = enable_reuse
116 def update_pipeline_component(self, record):
119 def arvados_job_spec(self, *args, **kwargs):
120 upload_docker(self.arvrunner, self.tool)
122 self.name = os.path.basename(self.tool.tool["id"])
124 workflowmapper = upload_dependencies(self.arvrunner,
126 self.tool.doc_loader,
128 self.tool.tool["id"],
131 jobmapper = upload_dependencies(self.arvrunner,
132 os.path.basename(self.job_order.get("id", "#")),
133 self.tool.doc_loader,
135 self.job_order.get("id", "#"),
138 adjustDirObjs(self.job_order, del_listing)
140 if "id" in self.job_order:
141 del self.job_order["id"]
143 return workflowmapper
146 def done(self, record):
147 if record["state"] == "Complete":
148 if record.get("exit_code") is not None:
149 if record["exit_code"] == 33:
150 processStatus = "UnsupportedRequirement"
151 elif record["exit_code"] == 0:
152 processStatus = "success"
154 processStatus = "permanentFail"
156 processStatus = "success"
158 processStatus = "permanentFail"
163 outc = arvados.collection.Collection(record["output"])
164 with outc.open("cwl.output.json") as f:
165 outputs = json.load(f)
166 def keepify(fileobj):
167 path = fileobj["location"]
168 if not path.startswith("keep:"):
169 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
170 adjustFileObjs(outputs, keepify)
171 adjustDirObjs(outputs, keepify)
172 except Exception as e:
173 logger.error("While getting final output object: %s", e)
174 self.arvrunner.output_callback(outputs, processStatus)
176 del self.arvrunner.processes[record["uuid"]]