1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
11 from functools import partial
14 import subprocess32 as subprocess
15 from collections import namedtuple
17 from io import StringIO
19 from schema_salad.sourceline import SourceLine, cmap
21 from cwltool.command_line_tool import CommandLineTool
22 import cwltool.workflow
23 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
24 from cwltool.load_tool import fetch_document
25 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.utils import aslist
27 from cwltool.builder import substitute
28 from cwltool.pack import pack
30 import arvados.collection
31 import ruamel.yaml as yaml
33 import arvados_cwl.arvdocker
34 from .pathmapper import ArvPathMapper, trim_listing
35 from ._version import __version__
38 logger = logging.getLogger('arvados.cwl-runner')
40 def trim_anonymous_location(obj):
41 """Remove 'location' field from File and Directory literals.
43 To make internal handling easier, literals are assigned a random id for
44 'location'. However, when writing the record back out, this can break
45 reproducibility. Since it is valid for literals not have a 'location'
50 if obj.get("location", "").startswith("_:"):
54 def remove_redundant_fields(obj):
55 for field in ("path", "nameext", "nameroot", "dirname"):
60 def find_defaults(d, op):
61 if isinstance(d, list):
64 elif isinstance(d, dict):
68 for i in viewvalues(d):
71 def setSecondary(t, fileobj, discovered):
72 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
73 if "secondaryFiles" not in fileobj:
74 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
75 if discovered is not None:
76 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
77 elif isinstance(fileobj, list):
79 setSecondary(t, e, discovered)
81 def discover_secondary_files(inputs, job_order, discovered=None):
83 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
84 setSecondary(t, job_order[shortname(t["id"])], discovered)
87 def upload_dependencies(arvrunner, name, document_loader,
88 workflowobj, uri, loadref_run,
89 include_primary=True, discovered_secondaryfiles=None):
90 """Upload the dependencies of the workflowobj document to Keep.
92 Returns a pathmapper object mapping local paths to keep references. Also
93 does an in-place update of references in "workflowobj".
95 Use scandeps to find $import, $include, $schemas, run, File and Directory
96 fields that represent external references.
98 If workflowobj has an "id" field, this will reload the document to ensure
99 it is scanning the raw document prior to preprocessing.
104 joined = document_loader.fetcher.urljoin(b, u)
105 defrg, _ = urllib.parse.urldefrag(joined)
106 if defrg not in loaded:
108 # Use fetch_text to get raw file (before preprocessing).
109 text = document_loader.fetch_text(defrg)
110 if isinstance(text, bytes):
111 textIO = StringIO(text.decode('utf-8'))
113 textIO = StringIO(text)
114 return yaml.safe_load(textIO)
119 loadref_fields = set(("$import", "run"))
121 loadref_fields = set(("$import",))
123 scanobj = workflowobj
124 if "id" in workflowobj:
125 # Need raw file content (before preprocessing) to ensure
126 # that external references in $include and $mixin are captured.
127 scanobj = loadref("", workflowobj["id"])
129 sc_result = scandeps(uri, scanobj,
131 set(("$include", "$schemas", "location")),
132 loadref, urljoin=document_loader.fetcher.urljoin)
136 # Only interested in local files than need to be uploaded,
137 # don't include file literals, keep references, etc.
138 sp = obj.get("location", "").split(":")
139 if len(sp) > 1 and sp[0] in ("file", "http", "https"):
142 visit_class(sc_result, ("File", "Directory"), only_real)
144 normalizeFilesDirs(sc)
146 if include_primary and "id" in workflowobj:
147 sc.append({"class": "File", "location": workflowobj["id"]})
149 if "$schemas" in workflowobj:
150 for s in workflowobj["$schemas"]:
151 sc.append({"class": "File", "location": s})
153 def visit_default(obj):
155 def ensure_default_location(f):
156 if "location" not in f and "path" in f:
157 f["location"] = f["path"]
159 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
160 # Doesn't exist, remove from list of dependencies to upload
161 sc[:] = [x for x in sc if x["location"] != f["location"]]
162 # Delete "default" from workflowobj
164 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
168 find_defaults(workflowobj, visit_default)
171 def discover_default_secondary_files(obj):
172 discover_secondary_files(obj["inputs"],
173 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
176 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
178 for d in list(discovered):
179 # Only interested in discovered secondaryFiles which are local
180 # files that need to be uploaded.
181 if d.startswith("file:"):
182 sc.extend(discovered[d])
186 mapper = ArvPathMapper(arvrunner, sc, "",
190 single_collection=True)
193 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
194 p["location"] = mapper.mapper(p["location"]).resolved
196 visit_class(workflowobj, ("File", "Directory"), setloc)
197 visit_class(discovered, ("File", "Directory"), setloc)
199 if discovered_secondaryfiles is not None:
201 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
203 if "$schemas" in workflowobj:
205 for s in workflowobj["$schemas"]:
206 sch.append(mapper.mapper(s).resolved)
207 workflowobj["$schemas"] = sch
212 def upload_docker(arvrunner, tool):
213 """Uploads Docker images used in CommandLineTool objects."""
215 if isinstance(tool, CommandLineTool):
216 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
218 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
219 # TODO: can be supported by containers API, but not jobs API.
220 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
221 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
222 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
224 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
225 elif isinstance(tool, cwltool.workflow.Workflow):
227 upload_docker(arvrunner, s.embedded_tool)
230 def packed_workflow(arvrunner, tool, merged_map):
231 """Create a packed workflow.
233 A "packed" workflow is one where all the components have been combined into a single document."""
236 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
237 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
239 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
241 def visit(v, cur_id):
242 if isinstance(v, dict):
243 if v.get("class") in ("CommandLineTool", "Workflow"):
245 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
246 cur_id = rewrite_to_orig.get(v["id"], v["id"])
247 if "location" in v and not v["location"].startswith("keep:"):
248 v["location"] = merged_map[cur_id].resolved[v["location"]]
249 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
250 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
251 if v.get("class") == "DockerRequirement":
252 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
255 if isinstance(v, list):
262 def tag_git_version(packed):
263 if tool.tool["id"].startswith("file://"):
264 path = os.path.dirname(tool.tool["id"][7:])
266 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
267 except (OSError, subprocess.CalledProcessError):
270 packed["http://schema.org/version"] = githash
273 def upload_job_order(arvrunner, name, tool, job_order):
274 """Upload local files referenced in the input object and return updated input
275 object with 'location' updated to the proper keep references.
278 discover_secondary_files(tool.tool["inputs"], job_order)
280 jobmapper = upload_dependencies(arvrunner,
284 job_order.get("id", "#"),
287 if "id" in job_order:
290 # Need to filter this out, gets added by cwltool when providing
291 # parameters on the command line.
292 if "job_order" in job_order:
293 del job_order["job_order"]
297 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
299 def upload_workflow_deps(arvrunner, tool):
300 # Ensure that Docker images needed by this workflow are available
302 upload_docker(arvrunner, tool)
304 document_loader = tool.doc_loader
308 def upload_tool_deps(deptool):
310 discovered_secondaryfiles = {}
311 pm = upload_dependencies(arvrunner,
312 "%s dependencies" % (shortname(deptool["id"])),
317 include_primary=False,
318 discovered_secondaryfiles=discovered_secondaryfiles)
319 document_loader.idx[deptool["id"]] = deptool
321 for k,v in pm.items():
322 toolmap[k] = v.resolved
323 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
325 tool.visit(upload_tool_deps)
329 def arvados_jobs_image(arvrunner, img):
330 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
333 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
334 except Exception as e:
335 raise Exception("Docker image %s is not available\n%s" % (img, e) )
338 def upload_workflow_collection(arvrunner, name, packed):
339 collection = arvados.collection.Collection(api_client=arvrunner.api,
340 keep_client=arvrunner.keep_client,
341 num_retries=arvrunner.num_retries)
342 with collection.open("workflow.cwl", "w") as f:
343 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
345 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
346 ["name", "like", name+"%"]]
347 if arvrunner.project_uuid:
348 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
349 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
352 logger.info("Using collection %s", exists["items"][0]["uuid"])
354 collection.save_new(name=name,
355 owner_uuid=arvrunner.project_uuid,
356 ensure_unique_name=True,
357 num_retries=arvrunner.num_retries)
358 logger.info("Uploaded to %s", collection.manifest_locator())
360 return collection.portable_data_hash()
363 class Runner(Process):
364 """Base class for runner processes, which submit an instance of
365 arvados-cwl-runner and wait for the final result."""
367 def __init__(self, runner, tool, loadingContext, enable_reuse,
368 output_name, output_tags, submit_runner_ram=0,
369 name=None, on_error=None, submit_runner_image=None,
370 intermediate_output_ttl=0, merged_map=None,
371 priority=None, secret_store=None,
372 collection_cache_size=256,
373 collection_cache_is_default=True):
375 super(Runner, self).__init__(tool.tool, loadingContext)
377 self.arvrunner = runner
378 self.embedded_tool = tool
379 self.job_order = None
382 # If reuse is permitted by command line arguments but
383 # disabled by the workflow itself, disable it.
384 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
386 enable_reuse = reuse_req["enableReuse"]
387 self.enable_reuse = enable_reuse
389 self.final_output = None
390 self.output_name = output_name
391 self.output_tags = output_tags
393 self.on_error = on_error
394 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
395 self.intermediate_output_ttl = intermediate_output_ttl
396 self.priority = priority
397 self.secret_store = secret_store
399 self.submit_runner_cores = 1
400 self.submit_runner_ram = 1024 # defaut 1 GiB
401 self.collection_cache_size = collection_cache_size
403 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
404 if runner_resource_req:
405 if runner_resource_req.get("coresMin"):
406 self.submit_runner_cores = runner_resource_req["coresMin"]
407 if runner_resource_req.get("ramMin"):
408 self.submit_runner_ram = runner_resource_req["ramMin"]
409 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
410 self.collection_cache_size = runner_resource_req["keep_cache"]
412 if submit_runner_ram:
413 # Command line / initializer overrides default and/or spec from workflow
414 self.submit_runner_ram = submit_runner_ram
416 if self.submit_runner_ram <= 0:
417 raise Exception("Value of submit-runner-ram must be greater than zero")
419 if self.submit_runner_cores <= 0:
420 raise Exception("Value of submit-runner-cores must be greater than zero")
422 self.merged_map = merged_map or {}
425 job_order, # type: Mapping[Text, Text]
426 output_callbacks, # type: Callable[[Any, Any], Any]
427 runtimeContext # type: RuntimeContext
428 ): # type: (...) -> Generator[Any, None, None]
429 self.job_order = job_order
430 self._init_job(job_order, runtimeContext)
433 def update_pipeline_component(self, record):
436 def done(self, record):
437 """Base method for handling a completed runner."""
440 if record["state"] == "Complete":
441 if record.get("exit_code") is not None:
442 if record["exit_code"] == 33:
443 processStatus = "UnsupportedRequirement"
444 elif record["exit_code"] == 0:
445 processStatus = "success"
447 processStatus = "permanentFail"
449 processStatus = "success"
451 processStatus = "permanentFail"
455 if processStatus == "permanentFail":
456 logc = arvados.collection.CollectionReader(record["log"],
457 api_client=self.arvrunner.api,
458 keep_client=self.arvrunner.keep_client,
459 num_retries=self.arvrunner.num_retries)
460 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
462 self.final_output = record["output"]
463 outc = arvados.collection.CollectionReader(self.final_output,
464 api_client=self.arvrunner.api,
465 keep_client=self.arvrunner.keep_client,
466 num_retries=self.arvrunner.num_retries)
467 if "cwl.output.json" in outc:
468 with outc.open("cwl.output.json", "rb") as f:
470 outputs = json.loads(f.read().decode())
471 def keepify(fileobj):
472 path = fileobj["location"]
473 if not path.startswith("keep:"):
474 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
475 adjustFileObjs(outputs, keepify)
476 adjustDirObjs(outputs, keepify)
478 logger.exception("[%s] While getting final output object", self.name)
479 self.arvrunner.output_callback({}, "permanentFail")
481 self.arvrunner.output_callback(outputs, processStatus)