3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
22 logger = logging.getLogger('arvados.cwl-runner')
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
26 def upload_dependencies(arvrunner, name, document_loader,
27 workflowobj, uri, loadref_run):
28 """Upload the dependencies of the workflowobj document to Keep.
30 Returns a pathmapper object mapping local paths to keep references. Also
31 does an in-place update of references in "workflowobj".
33 Use scandeps to find $import, $include, $schemas, run, File and Directory
34 fields that represent external references.
36 If workflowobj has an "id" field, this will reload the document to ensure
37 it is scanning the raw document prior to preprocessing.
42 joined = urlparse.urljoin(b, u)
43 defrg, _ = urlparse.urldefrag(joined)
44 if defrg not in loaded:
46 # Use fetch_text to get raw file (before preprocessing).
47 text = document_loader.fetch_text(defrg)
48 if isinstance(text, bytes):
49 textIO = StringIO(text.decode('utf-8'))
51 textIO = StringIO(text)
52 return yaml.safe_load(textIO)
57 loadref_fields = set(("$import", "run"))
59 loadref_fields = set(("$import",))
62 if "id" in workflowobj:
63 # Need raw file content (before preprocessing) to ensure
64 # that external references in $include and $mixin are captured.
65 scanobj = loadref("", workflowobj["id"])
67 sc = scandeps(uri, scanobj,
69 set(("$include", "$schemas", "location")),
76 adjustFileObjs(sc, visitFiles)
77 adjustDirObjs(sc, visitFiles)
79 normalizeFilesDirs(files)
81 if "id" in workflowobj:
82 files.append({"class": "File", "location": workflowobj["id"]})
84 mapper = ArvPathMapper(arvrunner, files, "",
90 if not p["location"].startswith("keep:"):
91 p["location"] = mapper.mapper(p["location"]).resolved
92 adjustFileObjs(workflowobj, setloc)
93 adjustDirObjs(workflowobj, setloc)
98 def upload_docker(arvrunner, tool):
99 if isinstance(tool, CommandLineTool):
100 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
102 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
103 elif isinstance(tool, cwltool.workflow.Workflow):
105 upload_docker(arvrunner, s.embedded_tool)
108 class Runner(object):
109 def __init__(self, runner, tool, job_order, enable_reuse):
110 self.arvrunner = runner
112 self.job_order = job_order
114 self.enable_reuse = enable_reuse
117 def update_pipeline_component(self, record):
120 def arvados_job_spec(self, *args, **kwargs):
121 upload_docker(self.arvrunner, self.tool)
123 self.name = os.path.basename(self.tool.tool["id"])
125 workflowmapper = upload_dependencies(self.arvrunner,
127 self.tool.doc_loader,
129 self.tool.tool["id"],
132 jobmapper = upload_dependencies(self.arvrunner,
133 os.path.basename(self.job_order.get("id", "#")),
134 self.tool.doc_loader,
136 self.job_order.get("id", "#"),
139 if "id" in self.job_order:
140 del self.job_order["id"]
142 return workflowmapper
145 def done(self, record):
146 if record["state"] == "Complete":
147 if record.get("exit_code") is not None:
148 if record["exit_code"] == 33:
149 processStatus = "UnsupportedRequirement"
150 elif record["exit_code"] == 0:
151 processStatus = "success"
153 processStatus = "permanentFail"
155 processStatus = "success"
157 processStatus = "permanentFail"
162 outc = arvados.collection.Collection(record["output"])
163 with outc.open("cwl.output.json") as f:
164 outputs = json.load(f)
165 def keepify(fileobj):
166 path = fileobj["location"]
167 if not path.startswith("keep:"):
168 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
169 adjustFileObjs(outputs, keepify)
170 adjustDirObjs(outputs, keepify)
171 except Exception as e:
172 logger.error("While getting final output object: %s", e)
173 self.arvrunner.output_callback(outputs, processStatus)
175 del self.arvrunner.processes[record["uuid"]]