1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 from cwltool.command_line_tool import CommandLineTool
17 import cwltool.workflow
18 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
19 from cwltool.load_tool import fetch_document
20 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
21 from cwltool.utils import aslist
22 from cwltool.builder import substitute
23 from cwltool.pack import pack
25 import arvados.collection
26 import ruamel.yaml as yaml
28 from .arvdocker import arv_docker_get_image
29 from .pathmapper import ArvPathMapper, trim_listing
30 from ._version import __version__
33 logger = logging.getLogger('arvados.cwl-runner')
35 def trim_anonymous_location(obj):
36 """Remove 'location' field from File and Directory literals.
38 To make internal handling easier, literals are assigned a random id for
39 'location'. However, when writing the record back out, this can break
40 reproducibility. Since it is valid for literals not have a 'location'
45 if obj.get("location", "").startswith("_:"):
48 def remove_redundant_fields(obj):
49 for field in ("path", "nameext", "nameroot", "dirname"):
53 def find_defaults(d, op):
54 if isinstance(d, list):
57 elif isinstance(d, dict):
61 for i in d.itervalues():
64 def upload_dependencies(arvrunner, name, document_loader,
65 workflowobj, uri, loadref_run, include_primary=True):
66 """Upload the dependencies of the workflowobj document to Keep.
68 Returns a pathmapper object mapping local paths to keep references. Also
69 does an in-place update of references in "workflowobj".
71 Use scandeps to find $import, $include, $schemas, run, File and Directory
72 fields that represent external references.
74 If workflowobj has an "id" field, this will reload the document to ensure
75 it is scanning the raw document prior to preprocessing.
80 joined = document_loader.fetcher.urljoin(b, u)
81 defrg, _ = urlparse.urldefrag(joined)
82 if defrg not in loaded:
84 # Use fetch_text to get raw file (before preprocessing).
85 text = document_loader.fetch_text(defrg)
86 if isinstance(text, bytes):
87 textIO = StringIO(text.decode('utf-8'))
89 textIO = StringIO(text)
90 return yaml.safe_load(textIO)
95 loadref_fields = set(("$import", "run"))
97 loadref_fields = set(("$import",))
100 if "id" in workflowobj:
101 # Need raw file content (before preprocessing) to ensure
102 # that external references in $include and $mixin are captured.
103 scanobj = loadref("", workflowobj["id"])
105 sc = scandeps(uri, scanobj,
107 set(("$include", "$schemas", "location")),
108 loadref, urljoin=document_loader.fetcher.urljoin)
110 normalizeFilesDirs(sc)
112 if include_primary and "id" in workflowobj:
113 sc.append({"class": "File", "location": workflowobj["id"]})
115 if "$schemas" in workflowobj:
116 for s in workflowobj["$schemas"]:
117 sc.append({"class": "File", "location": s})
119 def capture_default(obj):
122 if "location" not in f and "path" in f:
123 f["location"] = f["path"]
125 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
127 sc[:] = [x for x in sc if x["location"] != f["location"]]
128 # Delete "default" from workflowobj
130 visit_class(obj["default"], ("File", "Directory"), add_default)
134 find_defaults(workflowobj, capture_default)
136 mapper = ArvPathMapper(arvrunner, sc, "",
140 single_collection=True)
143 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
144 p["location"] = mapper.mapper(p["location"]).resolved
145 adjustFileObjs(workflowobj, setloc)
146 adjustDirObjs(workflowobj, setloc)
148 if "$schemas" in workflowobj:
150 for s in workflowobj["$schemas"]:
151 sch.append(mapper.mapper(s).resolved)
152 workflowobj["$schemas"] = sch
157 def upload_docker(arvrunner, tool):
158 """Uploads Docker images used in CommandLineTool objects."""
160 if isinstance(tool, CommandLineTool):
161 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
164 # TODO: can be supported by containers API, but not jobs API.
165 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
166 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
167 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
170 elif isinstance(tool, cwltool.workflow.Workflow):
172 upload_docker(arvrunner, s.embedded_tool)
174 def packed_workflow(arvrunner, tool, merged_map):
175 """Create a packed workflow.
177 A "packed" workflow is one where all the components have been combined into a single document."""
180 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
184 for k,v in rewrites.items():
185 rewrite_to_orig[v] = k
187 def visit(v, cur_id):
188 if isinstance(v, dict):
189 if v.get("class") in ("CommandLineTool", "Workflow"):
190 cur_id = rewrite_to_orig.get(v["id"], v["id"])
191 if "location" in v and not v["location"].startswith("keep:"):
192 v["location"] = merged_map[cur_id][v["location"]]
195 if isinstance(v, list):
201 def tag_git_version(packed):
202 if tool.tool["id"].startswith("file://"):
203 path = os.path.dirname(tool.tool["id"][7:])
205 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
206 except (OSError, subprocess.CalledProcessError):
209 packed["http://schema.org/version"] = githash
212 def discover_secondary_files(inputs, job_order):
214 def setSecondary(fileobj):
215 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
216 if "secondaryFiles" not in fileobj:
217 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
219 if isinstance(fileobj, list):
223 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
224 setSecondary(job_order[shortname(t["id"])])
226 def upload_job_order(arvrunner, name, tool, job_order):
227 """Upload local files referenced in the input object and return updated input
228 object with 'location' updated to the proper keep references.
231 discover_secondary_files(tool.tool["inputs"], job_order)
233 jobmapper = upload_dependencies(arvrunner,
237 job_order.get("id", "#"),
240 if "id" in job_order:
243 # Need to filter this out, gets added by cwltool when providing
244 # parameters on the command line.
245 if "job_order" in job_order:
246 del job_order["job_order"]
250 def upload_workflow_deps(arvrunner, tool):
251 # Ensure that Docker images needed by this workflow are available
253 upload_docker(arvrunner, tool)
255 document_loader = tool.doc_loader
259 def upload_tool_deps(deptool):
261 pm = upload_dependencies(arvrunner,
262 "%s dependencies" % (shortname(deptool["id"])),
267 include_primary=False)
268 document_loader.idx[deptool["id"]] = deptool
270 for k,v in pm.items():
271 toolmap[k] = v.resolved
272 merged_map[deptool["id"]] = toolmap
274 tool.visit(upload_tool_deps)
278 def arvados_jobs_image(arvrunner, img):
279 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
282 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
283 except Exception as e:
284 raise Exception("Docker image %s is not available\n%s" % (img, e) )
287 def upload_workflow_collection(arvrunner, name, packed):
288 collection = arvados.collection.Collection(api_client=arvrunner.api,
289 keep_client=arvrunner.keep_client,
290 num_retries=arvrunner.num_retries)
291 with collection.open("workflow.cwl", "w") as f:
292 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
294 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
295 ["name", "like", name+"%"]]
296 if arvrunner.project_uuid:
297 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
298 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
301 logger.info("Using collection %s", exists["items"][0]["uuid"])
303 collection.save_new(name=name,
304 owner_uuid=arvrunner.project_uuid,
305 ensure_unique_name=True,
306 num_retries=arvrunner.num_retries)
307 logger.info("Uploaded to %s", collection.manifest_locator())
309 return collection.portable_data_hash()
312 class Runner(object):
313 """Base class for runner processes, which submit an instance of
314 arvados-cwl-runner and wait for the final result."""
316 def __init__(self, runner, tool, job_order, enable_reuse,
317 output_name, output_tags, submit_runner_ram=0,
318 name=None, on_error=None, submit_runner_image=None,
319 intermediate_output_ttl=0, merged_map=None, priority=None,
321 self.arvrunner = runner
323 self.job_order = job_order
326 # If reuse is permitted by command line arguments but
327 # disabled by the workflow itself, disable it.
328 reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
330 enable_reuse = reuse_req["enableReuse"]
331 self.enable_reuse = enable_reuse
333 self.final_output = None
334 self.output_name = output_name
335 self.output_tags = output_tags
337 self.on_error = on_error
338 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
339 self.intermediate_output_ttl = intermediate_output_ttl
340 self.priority = priority
341 self.secret_store = secret_store
343 if submit_runner_ram:
344 self.submit_runner_ram = submit_runner_ram
346 self.submit_runner_ram = 3000
348 if self.submit_runner_ram <= 0:
349 raise Exception("Value of --submit-runner-ram must be greater than zero")
351 self.merged_map = merged_map or {}
353 def update_pipeline_component(self, record):
356 def done(self, record):
357 """Base method for handling a completed runner."""
360 if record["state"] == "Complete":
361 if record.get("exit_code") is not None:
362 if record["exit_code"] == 33:
363 processStatus = "UnsupportedRequirement"
364 elif record["exit_code"] == 0:
365 processStatus = "success"
367 processStatus = "permanentFail"
369 processStatus = "success"
371 processStatus = "permanentFail"
375 if processStatus == "permanentFail":
376 logc = arvados.collection.CollectionReader(record["log"],
377 api_client=self.arvrunner.api,
378 keep_client=self.arvrunner.keep_client,
379 num_retries=self.arvrunner.num_retries)
380 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
382 self.final_output = record["output"]
383 outc = arvados.collection.CollectionReader(self.final_output,
384 api_client=self.arvrunner.api,
385 keep_client=self.arvrunner.keep_client,
386 num_retries=self.arvrunner.num_retries)
387 if "cwl.output.json" in outc:
388 with outc.open("cwl.output.json") as f:
390 outputs = json.load(f)
391 def keepify(fileobj):
392 path = fileobj["location"]
393 if not path.startswith("keep:"):
394 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
395 adjustFileObjs(outputs, keepify)
396 adjustDirObjs(outputs, keepify)
397 except Exception as e:
398 logger.exception("[%s] While getting final output object: %s", self.name, e)
399 self.arvrunner.output_callback({}, "permanentFail")
401 self.arvrunner.output_callback(outputs, processStatus)
403 self.arvrunner.process_done(record["uuid"])