1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
49 def remove_redundant_fields(obj):
50 for field in ("path", "nameext", "nameroot", "dirname"):
54 def find_defaults(d, op):
55 if isinstance(d, list):
58 elif isinstance(d, dict):
62 for i in d.itervalues():
65 def upload_dependencies(arvrunner, name, document_loader,
66 workflowobj, uri, loadref_run, include_primary=True):
67 """Upload the dependencies of the workflowobj document to Keep.
69 Returns a pathmapper object mapping local paths to keep references. Also
70 does an in-place update of references in "workflowobj".
72 Use scandeps to find $import, $include, $schemas, run, File and Directory
73 fields that represent external references.
75 If workflowobj has an "id" field, this will reload the document to ensure
76 it is scanning the raw document prior to preprocessing.
81 joined = document_loader.fetcher.urljoin(b, u)
82 defrg, _ = urlparse.urldefrag(joined)
83 if defrg not in loaded:
85 # Use fetch_text to get raw file (before preprocessing).
86 text = document_loader.fetch_text(defrg)
87 if isinstance(text, bytes):
88 textIO = StringIO(text.decode('utf-8'))
90 textIO = StringIO(text)
91 return yaml.safe_load(textIO)
96 loadref_fields = set(("$import", "run"))
98 loadref_fields = set(("$import",))
100 scanobj = workflowobj
101 if "id" in workflowobj:
102 # Need raw file content (before preprocessing) to ensure
103 # that external references in $include and $mixin are captured.
104 scanobj = loadref("", workflowobj["id"])
106 sc = scandeps(uri, scanobj,
108 set(("$include", "$schemas", "location")),
109 loadref, urljoin=document_loader.fetcher.urljoin)
111 normalizeFilesDirs(sc)
113 if include_primary and "id" in workflowobj:
114 sc.append({"class": "File", "location": workflowobj["id"]})
116 if "$schemas" in workflowobj:
117 for s in workflowobj["$schemas"]:
118 sc.append({"class": "File", "location": s})
120 def capture_default(obj):
123 if "location" not in f and "path" in f:
124 f["location"] = f["path"]
126 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
128 sc[:] = [x for x in sc if x["location"] != f["location"]]
129 # Delete "default" from workflowobj
131 visit_class(obj["default"], ("File", "Directory"), add_default)
135 find_defaults(workflowobj, capture_default)
137 mapper = ArvPathMapper(arvrunner, sc, "",
141 single_collection=True)
144 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145 p["location"] = mapper.mapper(p["location"]).resolved
146 adjustFileObjs(workflowobj, setloc)
147 adjustDirObjs(workflowobj, setloc)
149 if "$schemas" in workflowobj:
151 for s in workflowobj["$schemas"]:
152 sch.append(mapper.mapper(s).resolved)
153 workflowobj["$schemas"] = sch
158 def upload_docker(arvrunner, tool):
159 """Uploads Docker images used in CommandLineTool objects."""
161 if isinstance(tool, CommandLineTool):
162 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
164 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
165 # TODO: can be supported by containers API, but not jobs API.
166 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
170 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171 elif isinstance(tool, cwltool.workflow.Workflow):
173 upload_docker(arvrunner, s.embedded_tool)
175 def packed_workflow(arvrunner, tool, merged_map):
176 """Create a packed workflow.
178 A "packed" workflow is one where all the components have been combined into a single document."""
181 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
182 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
185 for k,v in rewrites.items():
186 rewrite_to_orig[v] = k
188 def visit(v, cur_id):
189 if isinstance(v, dict):
190 if v.get("class") in ("CommandLineTool", "Workflow"):
191 cur_id = rewrite_to_orig.get(v["id"], v["id"])
192 if "location" in v and not v["location"].startswith("keep:"):
193 v["location"] = merged_map[cur_id][v["location"]]
196 if isinstance(v, list):
202 def tag_git_version(packed):
203 if tool.tool["id"].startswith("file://"):
204 path = os.path.dirname(tool.tool["id"][7:])
206 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
207 except (OSError, subprocess.CalledProcessError):
210 packed["http://schema.org/version"] = githash
213 def discover_secondary_files(inputs, job_order):
215 def setSecondary(fileobj):
216 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
217 if "secondaryFiles" not in fileobj:
218 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
220 if isinstance(fileobj, list):
224 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
225 setSecondary(job_order[shortname(t["id"])])
227 def upload_job_order(arvrunner, name, tool, job_order):
228 """Upload local files referenced in the input object and return updated input
229 object with 'location' updated to the proper keep references.
232 discover_secondary_files(tool.tool["inputs"], job_order)
234 jobmapper = upload_dependencies(arvrunner,
238 job_order.get("id", "#"),
241 if "id" in job_order:
244 # Need to filter this out, gets added by cwltool when providing
245 # parameters on the command line.
246 if "job_order" in job_order:
247 del job_order["job_order"]
251 def upload_workflow_deps(arvrunner, tool):
252 # Ensure that Docker images needed by this workflow are available
254 upload_docker(arvrunner, tool)
256 document_loader = tool.doc_loader
260 def upload_tool_deps(deptool):
262 pm = upload_dependencies(arvrunner,
263 "%s dependencies" % (shortname(deptool["id"])),
268 include_primary=False)
269 document_loader.idx[deptool["id"]] = deptool
271 for k,v in pm.items():
272 toolmap[k] = v.resolved
273 merged_map[deptool["id"]] = toolmap
275 tool.visit(upload_tool_deps)
279 def arvados_jobs_image(arvrunner, img):
280 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
283 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
284 except Exception as e:
285 raise Exception("Docker image %s is not available\n%s" % (img, e) )
288 def upload_workflow_collection(arvrunner, name, packed):
289 collection = arvados.collection.Collection(api_client=arvrunner.api,
290 keep_client=arvrunner.keep_client,
291 num_retries=arvrunner.num_retries)
292 with collection.open("workflow.cwl", "w") as f:
293 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
295 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
296 ["name", "like", name+"%"]]
297 if arvrunner.project_uuid:
298 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
299 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
302 logger.info("Using collection %s", exists["items"][0]["uuid"])
304 collection.save_new(name=name,
305 owner_uuid=arvrunner.project_uuid,
306 ensure_unique_name=True,
307 num_retries=arvrunner.num_retries)
308 logger.info("Uploaded to %s", collection.manifest_locator())
310 return collection.portable_data_hash()
313 class Runner(object):
314 """Base class for runner processes, which submit an instance of
315 arvados-cwl-runner and wait for the final result."""
317 def __init__(self, runner, tool, job_order, enable_reuse,
318 output_name, output_tags, submit_runner_ram=0,
319 name=None, on_error=None, submit_runner_image=None,
320 intermediate_output_ttl=0, merged_map=None):
321 self.arvrunner = runner
323 self.job_order = job_order
326 # If reuse is permitted by command line arguments but
327 # disabled by the workflow itself, disable it.
328 reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
330 enable_reuse = reuse_req["enableReuse"]
331 self.enable_reuse = enable_reuse
333 self.final_output = None
334 self.output_name = output_name
335 self.output_tags = output_tags
337 self.on_error = on_error
338 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
339 self.intermediate_output_ttl = intermediate_output_ttl
341 if submit_runner_ram:
342 self.submit_runner_ram = submit_runner_ram
344 self.submit_runner_ram = 3000
346 if self.submit_runner_ram <= 0:
347 raise Exception("Value of --submit-runner-ram must be greater than zero")
349 self.merged_map = merged_map or {}
351 def update_pipeline_component(self, record):
354 def done(self, record):
355 """Base method for handling a completed runner."""
358 if record["state"] == "Complete":
359 if record.get("exit_code") is not None:
360 if record["exit_code"] == 33:
361 processStatus = "UnsupportedRequirement"
362 elif record["exit_code"] == 0:
363 processStatus = "success"
365 processStatus = "permanentFail"
367 processStatus = "success"
369 processStatus = "permanentFail"
373 if processStatus == "permanentFail":
374 logc = arvados.collection.CollectionReader(record["log"],
375 api_client=self.arvrunner.api,
376 keep_client=self.arvrunner.keep_client,
377 num_retries=self.arvrunner.num_retries)
378 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
380 self.final_output = record["output"]
381 outc = arvados.collection.CollectionReader(self.final_output,
382 api_client=self.arvrunner.api,
383 keep_client=self.arvrunner.keep_client,
384 num_retries=self.arvrunner.num_retries)
385 if "cwl.output.json" in outc:
386 with outc.open("cwl.output.json") as f:
388 outputs = json.load(f)
389 def keepify(fileobj):
390 path = fileobj["location"]
391 if not path.startswith("keep:"):
392 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
393 adjustFileObjs(outputs, keepify)
394 adjustDirObjs(outputs, keepify)
395 except Exception as e:
396 logger.exception("[%s] While getting final output object: %s", self.name, e)
397 self.arvrunner.output_callback({}, "permanentFail")
399 self.arvrunner.output_callback(outputs, processStatus)
401 if record["uuid"] in self.arvrunner.processes:
402 del self.arvrunner.processes[record["uuid"]]