3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
22 logger = logging.getLogger('arvados.cwl-runner')
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
26 def upload_dependencies(arvrunner, name, document_loader,
27 workflowobj, uri, loadref_run):
28 """Upload the dependencies of the workflowobj document to Keep.
30 Returns a pathmapper object mapping local paths to keep references. Also
31 does an in-place update of references in "workflowobj".
33 Use scandeps to find $import, $include, $schemas, run, File and Directory
34 fields that represent external references.
36 If workflowobj has an "id" field, this will reload the document to ensure
37 it is scanning the raw document prior to preprocessing.
42 joined = urlparse.urljoin(b, u)
43 defrg, _ = urlparse.urldefrag(joined)
44 if defrg not in loaded:
46 # Use fetch_text to get raw file (before preprocessing).
47 text = document_loader.fetch_text(defrg)
48 if isinstance(text, bytes):
49 textIO = StringIO(text.decode('utf-8'))
51 textIO = StringIO(text)
52 return yaml.safe_load(textIO)
57 loadref_fields = set(("$import", "run"))
59 loadref_fields = set(("$import",))
62 if "id" in workflowobj:
63 # Need raw file content (before preprocessing) to ensure
64 # that external references in $include and $mixin are captured.
65 scanobj = loadref("", workflowobj["id"])
67 sc = scandeps(uri, scanobj,
69 set(("$include", "$schemas", "location")),
72 normalizeFilesDirs(sc)
74 if "id" in workflowobj:
75 sc.append({"class": "File", "location": workflowobj["id"]})
77 mapper = ArvPathMapper(arvrunner, sc, "",
83 if not p["location"].startswith("_:") and not p["location"].startswith("keep:"):
84 p["location"] = mapper.mapper(p["location"]).target
85 adjustFileObjs(workflowobj, setloc)
86 adjustDirObjs(workflowobj, setloc)
91 def upload_docker(arvrunner, tool):
92 if isinstance(tool, CommandLineTool):
93 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
95 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
96 elif isinstance(tool, cwltool.workflow.Workflow):
98 upload_docker(arvrunner, s.embedded_tool)
101 class Runner(object):
102 def __init__(self, runner, tool, job_order, enable_reuse):
103 self.arvrunner = runner
105 self.job_order = job_order
107 self.enable_reuse = enable_reuse
110 def update_pipeline_component(self, record):
113 def arvados_job_spec(self, *args, **kwargs):
114 upload_docker(self.arvrunner, self.tool)
116 self.name = os.path.basename(self.tool.tool["id"])
118 workflowmapper = upload_dependencies(self.arvrunner,
120 self.tool.doc_loader,
122 self.tool.tool["id"],
125 jobmapper = upload_dependencies(self.arvrunner,
126 os.path.basename(self.job_order.get("id", "#")),
127 self.tool.doc_loader,
129 self.job_order.get("id", "#"),
132 if "id" in self.job_order:
133 del self.job_order["id"]
135 return workflowmapper
138 def done(self, record):
139 if record["state"] == "Complete":
140 if record.get("exit_code") is not None:
141 if record["exit_code"] == 33:
142 processStatus = "UnsupportedRequirement"
143 elif record["exit_code"] == 0:
144 processStatus = "success"
146 processStatus = "permanentFail"
148 processStatus = "success"
150 processStatus = "permanentFail"
155 outc = arvados.collection.Collection(record["output"])
156 with outc.open("cwl.output.json") as f:
157 outputs = json.load(f)
158 def keepify(fileobj):
159 path = fileobj["location"]
160 if not path.startswith("keep:"):
161 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
162 adjustFileObjs(outputs, keepify)
163 adjustDirObjs(outputs, keepify)
164 except Exception as e:
165 logger.error("While getting final output object: %s", e)
166 self.arvrunner.output_callback(outputs, processStatus)
168 del self.arvrunner.processes[record["uuid"]]