3 from functools import partial
9 from StringIO import StringIO
11 from schema_salad.sourceline import SourceLine
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
23 import arvados.collection
24 import ruamel.yaml as yaml
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
31 logger = logging.getLogger('arvados.cwl-runner')
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
35 def trim_listing(obj):
36 """Remove 'listing' field from Directory objects that are keep references.
38 When Directory objects represent Keep references, it redundant and
39 potentially very expensive to pass fully enumerated Directory objects
40 between instances of cwl-runner (e.g. a submitting a job, or using the
41 RunInSingleContainer feature), so delete the 'listing' field when it is
45 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 if obj.get("location", "").startswith("_:"):
50 def upload_dependencies(arvrunner, name, document_loader,
51 workflowobj, uri, loadref_run, include_primary=True):
52 """Upload the dependencies of the workflowobj document to Keep.
54 Returns a pathmapper object mapping local paths to keep references. Also
55 does an in-place update of references in "workflowobj".
57 Use scandeps to find $import, $include, $schemas, run, File and Directory
58 fields that represent external references.
60 If workflowobj has an "id" field, this will reload the document to ensure
61 it is scanning the raw document prior to preprocessing.
66 joined = document_loader.fetcher.urljoin(b, u)
67 defrg, _ = urlparse.urldefrag(joined)
68 if defrg not in loaded:
70 # Use fetch_text to get raw file (before preprocessing).
71 text = document_loader.fetch_text(defrg)
72 if isinstance(text, bytes):
73 textIO = StringIO(text.decode('utf-8'))
75 textIO = StringIO(text)
76 return yaml.safe_load(textIO)
81 loadref_fields = set(("$import", "run"))
83 loadref_fields = set(("$import",))
86 if "id" in workflowobj:
87 # Need raw file content (before preprocessing) to ensure
88 # that external references in $include and $mixin are captured.
89 scanobj = loadref("", workflowobj["id"])
91 sc = scandeps(uri, scanobj,
93 set(("$include", "$schemas", "location")),
94 loadref, urljoin=document_loader.fetcher.urljoin)
96 normalizeFilesDirs(sc)
98 if include_primary and "id" in workflowobj:
99 sc.append({"class": "File", "location": workflowobj["id"]})
101 if "$schemas" in workflowobj:
102 for s in workflowobj["$schemas"]:
103 sc.append({"class": "File", "location": s})
105 mapper = ArvPathMapper(arvrunner, sc, "",
111 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112 p["location"] = mapper.mapper(p["location"]).resolved
113 adjustFileObjs(workflowobj, setloc)
114 adjustDirObjs(workflowobj, setloc)
116 if "$schemas" in workflowobj:
118 for s in workflowobj["$schemas"]:
119 sch.append(mapper.mapper(s).resolved)
120 workflowobj["$schemas"] = sch
125 def upload_docker(arvrunner, tool):
126 """Visitor which uploads Docker images referenced in CommandLineTool objects."""
127 if isinstance(tool, CommandLineTool):
128 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
130 if docker_req.get("dockerOutputDirectory"):
131 # TODO: can be supported by containers API, but not jobs API.
132 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
133 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
134 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136 def packed_workflow(arvrunner, tool):
137 """Create a packed workflow.
139 A "packed" workflow is one where all the components have been combined into a single document."""
141 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
142 tool.tool["id"], tool.metadata)
144 def tag_git_version(packed):
145 if tool.tool["id"].startswith("file://"):
146 path = os.path.dirname(tool.tool["id"][7:])
148 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
149 except (OSError, subprocess.CalledProcessError):
152 packed["http://schema.org/version"] = githash
155 def upload_job_order(arvrunner, name, tool, job_order):
156 """Upload local files referenced in the input object and return updated input
157 object with 'location' updated to the proper keep references.
160 for t in tool.tool["inputs"]:
161 def setSecondary(fileobj):
162 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
163 if "secondaryFiles" not in fileobj:
164 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
166 if isinstance(fileobj, list):
170 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
171 setSecondary(job_order[shortname(t["id"])])
173 jobmapper = upload_dependencies(arvrunner,
177 job_order.get("id", "#"),
180 if "id" in job_order:
183 # Need to filter this out, gets added by cwltool when providing
184 # parameters on the command line.
185 if "job_order" in job_order:
186 del job_order["job_order"]
190 def upload_workflow_deps(arvrunner, tool):
191 # Ensure that Docker images needed by this workflow are available
192 tool.visit(partial(upload_docker, arvrunner))
194 document_loader = tool.doc_loader
196 def upload_tool_deps(deptool):
198 upload_dependencies(arvrunner,
199 "%s dependencies" % (shortname(deptool["id"])),
204 include_primary=False)
205 document_loader.idx[deptool["id"]] = deptool
207 tool.visit(upload_tool_deps)
209 def arvados_jobs_image(arvrunner, img):
210 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
213 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
214 except Exception as e:
215 raise Exception("Docker image %s is not available\n%s" % (img, e) )
218 class Runner(object):
219 """Base class for runner processes, which submit an instance of
220 arvados-cwl-runner and wait for the final result."""
222 def __init__(self, runner, tool, job_order, enable_reuse,
223 output_name, output_tags, submit_runner_ram=0,
224 name=None, on_error=None, submit_runner_image=None):
225 self.arvrunner = runner
227 self.job_order = job_order
229 self.enable_reuse = enable_reuse
231 self.final_output = None
232 self.output_name = output_name
233 self.output_tags = output_tags
235 self.on_error = on_error
236 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
238 if submit_runner_ram:
239 self.submit_runner_ram = submit_runner_ram
241 self.submit_runner_ram = 1024
243 if self.submit_runner_ram <= 0:
244 raise Exception("Value of --submit-runner-ram must be greater than zero")
246 def update_pipeline_component(self, record):
249 def done(self, record):
250 """Base method for handling a completed runner."""
253 if record["state"] == "Complete":
254 if record.get("exit_code") is not None:
255 if record["exit_code"] == 33:
256 processStatus = "UnsupportedRequirement"
257 elif record["exit_code"] == 0:
258 processStatus = "success"
260 processStatus = "permanentFail"
262 processStatus = "success"
264 processStatus = "permanentFail"
268 if processStatus == "permanentFail":
269 logc = arvados.collection.CollectionReader(record["log"],
270 api_client=self.arvrunner.api,
271 keep_client=self.arvrunner.keep_client,
272 num_retries=self.arvrunner.num_retries)
273 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
275 self.final_output = record["output"]
276 outc = arvados.collection.CollectionReader(self.final_output,
277 api_client=self.arvrunner.api,
278 keep_client=self.arvrunner.keep_client,
279 num_retries=self.arvrunner.num_retries)
280 if "cwl.output.json" in outc:
281 with outc.open("cwl.output.json") as f:
283 outputs = json.load(f)
284 def keepify(fileobj):
285 path = fileobj["location"]
286 if not path.startswith("keep:"):
287 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
288 adjustFileObjs(outputs, keepify)
289 adjustDirObjs(outputs, keepify)
290 except Exception as e:
291 logger.exception("[%s] While getting final output object: %s", self.name, e)
292 self.arvrunner.output_callback({}, "permanentFail")
294 self.arvrunner.output_callback(outputs, processStatus)
296 if record["uuid"] in self.arvrunner.processes:
297 del self.arvrunner.processes[record["uuid"]]