3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 from cwltool.utils import aslist
16 from cwltool.builder import substitute
18 import arvados.collection
19 import ruamel.yaml as yaml
21 from .arvdocker import arv_docker_get_image
22 from .pathmapper import ArvPathMapper
23 from ._version import __version__
25 logger = logging.getLogger('arvados.cwl-runner')
27 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
29 def trim_listing(obj):
30 """Remove 'listing' field from Directory objects that are keep references.
32 When Directory objects represent Keep references, it redundant and
33 potentially very expensive to pass fully enumerated Directory objects
34 between instances of cwl-runner (e.g. a submitting a job, or using the
35 RunInSingleContainer feature), so delete the 'listing' field when it is
39 if obj.get("location", "").startswith("keep:") and "listing" in obj:
41 if obj.get("location", "").startswith("_:"):
44 def upload_dependencies(arvrunner, name, document_loader,
45 workflowobj, uri, loadref_run):
46 """Upload the dependencies of the workflowobj document to Keep.
48 Returns a pathmapper object mapping local paths to keep references. Also
49 does an in-place update of references in "workflowobj".
51 Use scandeps to find $import, $include, $schemas, run, File and Directory
52 fields that represent external references.
54 If workflowobj has an "id" field, this will reload the document to ensure
55 it is scanning the raw document prior to preprocessing.
60 joined = document_loader.fetcher.urljoin(b, u)
61 defrg, _ = urlparse.urldefrag(joined)
62 if defrg not in loaded:
64 # Use fetch_text to get raw file (before preprocessing).
65 text = document_loader.fetch_text(defrg)
66 if isinstance(text, bytes):
67 textIO = StringIO(text.decode('utf-8'))
69 textIO = StringIO(text)
70 return yaml.safe_load(textIO)
75 loadref_fields = set(("$import", "run"))
77 loadref_fields = set(("$import",))
80 if "id" in workflowobj:
81 # Need raw file content (before preprocessing) to ensure
82 # that external references in $include and $mixin are captured.
83 scanobj = loadref("", workflowobj["id"])
85 sc = scandeps(uri, scanobj,
87 set(("$include", "$schemas", "location")),
88 loadref, urljoin=document_loader.fetcher.urljoin)
90 normalizeFilesDirs(sc)
92 if "id" in workflowobj:
93 sc.append({"class": "File", "location": workflowobj["id"]})
95 mapper = ArvPathMapper(arvrunner, sc, "",
101 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
102 p["location"] = mapper.mapper(p["location"]).resolved
103 adjustFileObjs(workflowobj, setloc)
104 adjustDirObjs(workflowobj, setloc)
109 def upload_docker(arvrunner, tool):
110 if isinstance(tool, CommandLineTool):
111 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
113 if docker_req.get("dockerOutputDirectory"):
114 # TODO: can be supported by containers API, but not jobs API.
115 raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
116 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
117 elif isinstance(tool, cwltool.workflow.Workflow):
119 upload_docker(arvrunner, s.embedded_tool)
121 def upload_instance(arvrunner, name, tool, job_order):
122 upload_docker(arvrunner, tool)
124 for t in tool.tool["inputs"]:
125 def setSecondary(fileobj):
126 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
127 if "secondaryFiles" not in fileobj:
128 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
130 if isinstance(fileobj, list):
134 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
135 setSecondary(job_order[shortname(t["id"])])
137 workflowmapper = upload_dependencies(arvrunner,
143 jobmapper = upload_dependencies(arvrunner,
144 os.path.basename(job_order.get("id", "#")),
147 job_order.get("id", "#"),
150 if "id" in job_order:
153 return workflowmapper
155 def arvados_jobs_image(arvrunner):
156 img = "arvados/jobs:"+__version__
158 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
159 except Exception as e:
160 raise Exception("Docker image %s is not available\n%s" % (img, e) )
163 class Runner(object):
164 def __init__(self, runner, tool, job_order, enable_reuse,
165 output_name, output_tags, submit_runner_ram=0):
166 self.arvrunner = runner
168 self.job_order = job_order
170 self.enable_reuse = enable_reuse
172 self.final_output = None
173 self.output_name = output_name
174 self.output_tags = output_tags
175 if submit_runner_ram:
176 self.submit_runner_ram = submit_runner_ram
178 self.submit_runner_ram = 1024
180 if self.submit_runner_ram <= 0:
181 raise Exception("Value of --submit-runner-ram must be greater than zero")
183 def update_pipeline_component(self, record):
186 def arvados_job_spec(self, *args, **kwargs):
187 self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
189 # Need to filter this out, gets added by cwltool when providing
190 # parameters on the command line.
191 if "job_order" in self.job_order:
192 del self.job_order["job_order"]
194 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
195 adjustDirObjs(self.job_order, trim_listing)
196 return workflowmapper
198 def done(self, record):
199 if record["state"] == "Complete":
200 if record.get("exit_code") is not None:
201 if record["exit_code"] == 33:
202 processStatus = "UnsupportedRequirement"
203 elif record["exit_code"] == 0:
204 processStatus = "success"
206 processStatus = "permanentFail"
208 processStatus = "success"
210 processStatus = "permanentFail"
215 self.final_output = record["output"]
216 outc = arvados.collection.CollectionReader(self.final_output,
217 api_client=self.arvrunner.api,
218 keep_client=self.arvrunner.keep_client,
219 num_retries=self.arvrunner.num_retries)
220 if "cwl.output.json" in outc:
221 with outc.open("cwl.output.json") as f:
223 outputs = json.load(f)
224 def keepify(fileobj):
225 path = fileobj["location"]
226 if not path.startswith("keep:"):
227 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
228 adjustFileObjs(outputs, keepify)
229 adjustDirObjs(outputs, keepify)
230 except Exception as e:
231 logger.exception("While getting final output object: %s", e)
232 self.arvrunner.output_callback(outputs, processStatus)
234 if record["uuid"] in self.arvrunner.processes:
235 del self.arvrunner.processes[record["uuid"]]