1 from future import standard_library
2 standard_library.install_aliases()
3 # Copyright (C) The Arvados Authors. All rights reserved.
5 # SPDX-License-Identifier: Apache-2.0
9 from functools import partial
12 import subprocess32 as subprocess
13 from collections import namedtuple
15 from io import StringIO
17 from schema_salad.sourceline import SourceLine, cmap
19 from cwltool.command_line_tool import CommandLineTool
20 import cwltool.workflow
21 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
22 from cwltool.load_tool import fetch_document
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
24 from cwltool.utils import aslist
25 from cwltool.builder import substitute
26 from cwltool.pack import pack
28 import arvados.collection
29 import ruamel.yaml as yaml
31 import arvados_cwl.arvdocker
32 from .pathmapper import ArvPathMapper, trim_listing
33 from ._version import __version__
36 logger = logging.getLogger('arvados.cwl-runner')
38 def trim_anonymous_location(obj):
39 """Remove 'location' field from File and Directory literals.
41 To make internal handling easier, literals are assigned a random id for
42 'location'. However, when writing the record back out, this can break
43 reproducibility. Since it is valid for literals not have a 'location'
48 if obj.get("location", "").startswith("_:"):
52 def remove_redundant_fields(obj):
53 for field in ("path", "nameext", "nameroot", "dirname"):
58 def find_defaults(d, op):
59 if isinstance(d, list):
62 elif isinstance(d, dict):
69 def setSecondary(t, fileobj, discovered):
70 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
71 if "secondaryFiles" not in fileobj:
72 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
73 if discovered is not None:
74 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
75 elif isinstance(fileobj, list):
77 setSecondary(t, e, discovered)
79 def discover_secondary_files(inputs, job_order, discovered=None):
81 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
82 setSecondary(t, job_order[shortname(t["id"])], discovered)
85 def upload_dependencies(arvrunner, name, document_loader,
86 workflowobj, uri, loadref_run,
87 include_primary=True, discovered_secondaryfiles=None):
88 """Upload the dependencies of the workflowobj document to Keep.
90 Returns a pathmapper object mapping local paths to keep references. Also
91 does an in-place update of references in "workflowobj".
93 Use scandeps to find $import, $include, $schemas, run, File and Directory
94 fields that represent external references.
96 If workflowobj has an "id" field, this will reload the document to ensure
97 it is scanning the raw document prior to preprocessing.
102 joined = document_loader.fetcher.urljoin(b, u)
103 defrg, _ = urllib.parse.urldefrag(joined)
104 if defrg not in loaded:
106 # Use fetch_text to get raw file (before preprocessing).
107 text = document_loader.fetch_text(defrg)
108 if isinstance(text, bytes):
109 textIO = StringIO(text.decode('utf-8'))
111 textIO = StringIO(text)
112 return yaml.safe_load(textIO)
117 loadref_fields = set(("$import", "run"))
119 loadref_fields = set(("$import",))
121 scanobj = workflowobj
122 if "id" in workflowobj:
123 # Need raw file content (before preprocessing) to ensure
124 # that external references in $include and $mixin are captured.
125 scanobj = loadref("", workflowobj["id"])
127 sc_result = scandeps(uri, scanobj,
129 set(("$include", "$schemas", "location")),
130 loadref, urljoin=document_loader.fetcher.urljoin)
134 # Only interested in local files than need to be uploaded,
135 # don't include file literals, keep references, etc.
136 sp = obj.get("location", "").split(":")
137 if len(sp) > 1 and sp[0] in ("file", "http", "https"):
140 visit_class(sc_result, ("File", "Directory"), only_real)
142 normalizeFilesDirs(sc)
144 if include_primary and "id" in workflowobj:
145 sc.append({"class": "File", "location": workflowobj["id"]})
147 if "$schemas" in workflowobj:
148 for s in workflowobj["$schemas"]:
149 sc.append({"class": "File", "location": s})
151 def visit_default(obj):
153 def ensure_default_location(f):
154 if "location" not in f and "path" in f:
155 f["location"] = f["path"]
157 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
158 # Doesn't exist, remove from list of dependencies to upload
159 sc[:] = [x for x in sc if x["location"] != f["location"]]
160 # Delete "default" from workflowobj
162 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
166 find_defaults(workflowobj, visit_default)
169 def discover_default_secondary_files(obj):
170 discover_secondary_files(obj["inputs"],
171 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
174 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
176 for d in list(discovered.keys()):
177 # Only interested in discovered secondaryFiles which are local
178 # files that need to be uploaded.
179 if d.startswith("file:"):
180 sc.extend(discovered[d])
184 mapper = ArvPathMapper(arvrunner, sc, "",
188 single_collection=True)
191 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
192 p["location"] = mapper.mapper(p["location"]).resolved
194 visit_class(workflowobj, ("File", "Directory"), setloc)
195 visit_class(discovered, ("File", "Directory"), setloc)
197 if discovered_secondaryfiles is not None:
199 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
201 if "$schemas" in workflowobj:
203 for s in workflowobj["$schemas"]:
204 sch.append(mapper.mapper(s).resolved)
205 workflowobj["$schemas"] = sch
210 def upload_docker(arvrunner, tool):
211 """Uploads Docker images used in CommandLineTool objects."""
213 if isinstance(tool, CommandLineTool):
214 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
216 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
217 # TODO: can be supported by containers API, but not jobs API.
218 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
219 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
220 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
222 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
223 elif isinstance(tool, cwltool.workflow.Workflow):
225 upload_docker(arvrunner, s.embedded_tool)
228 def packed_workflow(arvrunner, tool, merged_map):
229 """Create a packed workflow.
231 A "packed" workflow is one where all the components have been combined into a single document."""
234 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
235 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
237 rewrite_to_orig = {v: k for k,v in list(rewrites.items())}
239 def visit(v, cur_id):
240 if isinstance(v, dict):
241 if v.get("class") in ("CommandLineTool", "Workflow"):
243 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
244 cur_id = rewrite_to_orig.get(v["id"], v["id"])
245 if "location" in v and not v["location"].startswith("keep:"):
246 v["location"] = merged_map[cur_id].resolved[v["location"]]
247 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
248 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
249 if v.get("class") == "DockerRequirement":
250 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
253 if isinstance(v, list):
260 def tag_git_version(packed):
261 if tool.tool["id"].startswith("file://"):
262 path = os.path.dirname(tool.tool["id"][7:])
264 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
265 except (OSError, subprocess.CalledProcessError):
268 packed["http://schema.org/version"] = githash
271 def upload_job_order(arvrunner, name, tool, job_order):
272 """Upload local files referenced in the input object and return updated input
273 object with 'location' updated to the proper keep references.
276 discover_secondary_files(tool.tool["inputs"], job_order)
278 jobmapper = upload_dependencies(arvrunner,
282 job_order.get("id", "#"),
285 if "id" in job_order:
288 # Need to filter this out, gets added by cwltool when providing
289 # parameters on the command line.
290 if "job_order" in job_order:
291 del job_order["job_order"]
295 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
297 def upload_workflow_deps(arvrunner, tool):
298 # Ensure that Docker images needed by this workflow are available
300 upload_docker(arvrunner, tool)
302 document_loader = tool.doc_loader
306 def upload_tool_deps(deptool):
308 discovered_secondaryfiles = {}
309 pm = upload_dependencies(arvrunner,
310 "%s dependencies" % (shortname(deptool["id"])),
315 include_primary=False,
316 discovered_secondaryfiles=discovered_secondaryfiles)
317 document_loader.idx[deptool["id"]] = deptool
319 for k,v in list(pm.items()):
320 toolmap[k] = v.resolved
321 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
323 tool.visit(upload_tool_deps)
327 def arvados_jobs_image(arvrunner, img):
328 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
331 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
332 except Exception as e:
333 raise Exception("Docker image %s is not available\n%s" % (img, e) )
336 def upload_workflow_collection(arvrunner, name, packed):
337 collection = arvados.collection.Collection(api_client=arvrunner.api,
338 keep_client=arvrunner.keep_client,
339 num_retries=arvrunner.num_retries)
340 with collection.open("workflow.cwl", "w") as f:
341 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
343 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
344 ["name", "like", name+"%"]]
345 if arvrunner.project_uuid:
346 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
347 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
350 logger.info("Using collection %s", exists["items"][0]["uuid"])
352 collection.save_new(name=name,
353 owner_uuid=arvrunner.project_uuid,
354 ensure_unique_name=True,
355 num_retries=arvrunner.num_retries)
356 logger.info("Uploaded to %s", collection.manifest_locator())
358 return collection.portable_data_hash()
361 class Runner(Process):
362 """Base class for runner processes, which submit an instance of
363 arvados-cwl-runner and wait for the final result."""
365 def __init__(self, runner, tool, loadingContext, enable_reuse,
366 output_name, output_tags, submit_runner_ram=0,
367 name=None, on_error=None, submit_runner_image=None,
368 intermediate_output_ttl=0, merged_map=None,
369 priority=None, secret_store=None,
370 collection_cache_size=256,
371 collection_cache_is_default=True):
373 super(Runner, self).__init__(tool.tool, loadingContext)
375 self.arvrunner = runner
376 self.embedded_tool = tool
377 self.job_order = None
380 # If reuse is permitted by command line arguments but
381 # disabled by the workflow itself, disable it.
382 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
384 enable_reuse = reuse_req["enableReuse"]
385 self.enable_reuse = enable_reuse
387 self.final_output = None
388 self.output_name = output_name
389 self.output_tags = output_tags
391 self.on_error = on_error
392 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
393 self.intermediate_output_ttl = intermediate_output_ttl
394 self.priority = priority
395 self.secret_store = secret_store
397 self.submit_runner_cores = 1
398 self.submit_runner_ram = 1024 # defaut 1 GiB
399 self.collection_cache_size = collection_cache_size
401 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
402 if runner_resource_req:
403 if runner_resource_req.get("coresMin"):
404 self.submit_runner_cores = runner_resource_req["coresMin"]
405 if runner_resource_req.get("ramMin"):
406 self.submit_runner_ram = runner_resource_req["ramMin"]
407 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
408 self.collection_cache_size = runner_resource_req["keep_cache"]
410 if submit_runner_ram:
411 # Command line / initializer overrides default and/or spec from workflow
412 self.submit_runner_ram = submit_runner_ram
414 if self.submit_runner_ram <= 0:
415 raise Exception("Value of submit-runner-ram must be greater than zero")
417 if self.submit_runner_cores <= 0:
418 raise Exception("Value of submit-runner-cores must be greater than zero")
420 self.merged_map = merged_map or {}
423 job_order, # type: Mapping[Text, Text]
424 output_callbacks, # type: Callable[[Any, Any], Any]
425 runtimeContext # type: RuntimeContext
426 ): # type: (...) -> Generator[Any, None, None]
427 self.job_order = job_order
428 self._init_job(job_order, runtimeContext)
431 def update_pipeline_component(self, record):
434 def done(self, record):
435 """Base method for handling a completed runner."""
438 if record["state"] == "Complete":
439 if record.get("exit_code") is not None:
440 if record["exit_code"] == 33:
441 processStatus = "UnsupportedRequirement"
442 elif record["exit_code"] == 0:
443 processStatus = "success"
445 processStatus = "permanentFail"
447 processStatus = "success"
449 processStatus = "permanentFail"
453 if processStatus == "permanentFail":
454 logc = arvados.collection.CollectionReader(record["log"],
455 api_client=self.arvrunner.api,
456 keep_client=self.arvrunner.keep_client,
457 num_retries=self.arvrunner.num_retries)
458 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
460 self.final_output = record["output"]
461 outc = arvados.collection.CollectionReader(self.final_output,
462 api_client=self.arvrunner.api,
463 keep_client=self.arvrunner.keep_client,
464 num_retries=self.arvrunner.num_retries)
465 if "cwl.output.json" in outc:
466 with outc.open("cwl.output.json", "rb") as f:
468 outputs = json.load(f)
469 def keepify(fileobj):
470 path = fileobj["location"]
471 if not path.startswith("keep:"):
472 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
473 adjustFileObjs(outputs, keepify)
474 adjustDirObjs(outputs, keepify)
475 except Exception as e:
476 logger.exception("[%s] While getting final output object: %s", self.name, e)
477 self.arvrunner.output_callback({}, "permanentFail")
479 self.arvrunner.output_callback(outputs, processStatus)