1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
12 from functools import partial
15 from collections import namedtuple
16 from io import StringIO
18 if os.name == "posix" and sys.version_info[0] < 3:
19 import subprocess32 as subprocess
23 from schema_salad.sourceline import SourceLine, cmap
25 from cwltool.command_line_tool import CommandLineTool
26 import cwltool.workflow
27 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
28 from cwltool.load_tool import fetch_document
29 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
30 from cwltool.utils import aslist
31 from cwltool.builder import substitute
32 from cwltool.pack import pack
34 import arvados.collection
35 import ruamel.yaml as yaml
37 import arvados_cwl.arvdocker
38 from .pathmapper import ArvPathMapper, trim_listing
39 from ._version import __version__
42 logger = logging.getLogger('arvados.cwl-runner')
44 def trim_anonymous_location(obj):
45 """Remove 'location' field from File and Directory literals.
47 To make internal handling easier, literals are assigned a random id for
48 'location'. However, when writing the record back out, this can break
49 reproducibility. Since it is valid for literals not have a 'location'
54 if obj.get("location", "").startswith("_:"):
58 def remove_redundant_fields(obj):
59 for field in ("path", "nameext", "nameroot", "dirname"):
64 def find_defaults(d, op):
65 if isinstance(d, list):
68 elif isinstance(d, dict):
72 for i in viewvalues(d):
75 def setSecondary(t, fileobj, discovered):
76 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
77 if "secondaryFiles" not in fileobj:
78 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
79 if discovered is not None:
80 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
81 elif isinstance(fileobj, list):
83 setSecondary(t, e, discovered)
85 def discover_secondary_files(inputs, job_order, discovered=None):
87 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
88 setSecondary(t, job_order[shortname(t["id"])], discovered)
91 def upload_dependencies(arvrunner, name, document_loader,
92 workflowobj, uri, loadref_run,
93 include_primary=True, discovered_secondaryfiles=None):
94 """Upload the dependencies of the workflowobj document to Keep.
96 Returns a pathmapper object mapping local paths to keep references. Also
97 does an in-place update of references in "workflowobj".
99 Use scandeps to find $import, $include, $schemas, run, File and Directory
100 fields that represent external references.
102 If workflowobj has an "id" field, this will reload the document to ensure
103 it is scanning the raw document prior to preprocessing.
108 joined = document_loader.fetcher.urljoin(b, u)
109 defrg, _ = urllib.parse.urldefrag(joined)
110 if defrg not in loaded:
112 # Use fetch_text to get raw file (before preprocessing).
113 text = document_loader.fetch_text(defrg)
114 if isinstance(text, bytes):
115 textIO = StringIO(text.decode('utf-8'))
117 textIO = StringIO(text)
118 return yaml.safe_load(textIO)
123 loadref_fields = set(("$import", "run"))
125 loadref_fields = set(("$import",))
127 scanobj = workflowobj
128 if "id" in workflowobj:
129 # Need raw file content (before preprocessing) to ensure
130 # that external references in $include and $mixin are captured.
131 scanobj = loadref("", workflowobj["id"])
133 sc_result = scandeps(uri, scanobj,
135 set(("$include", "$schemas", "location")),
136 loadref, urljoin=document_loader.fetcher.urljoin)
140 # Only interested in local files than need to be uploaded,
141 # don't include file literals, keep references, etc.
142 sp = obj.get("location", "").split(":")
143 if len(sp) > 1 and sp[0] in ("file", "http", "https"):
146 visit_class(sc_result, ("File", "Directory"), only_real)
148 normalizeFilesDirs(sc)
150 if include_primary and "id" in workflowobj:
151 sc.append({"class": "File", "location": workflowobj["id"]})
153 if "$schemas" in workflowobj:
154 for s in workflowobj["$schemas"]:
155 sc.append({"class": "File", "location": s})
157 def visit_default(obj):
159 def ensure_default_location(f):
160 if "location" not in f and "path" in f:
161 f["location"] = f["path"]
163 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
164 # Doesn't exist, remove from list of dependencies to upload
165 sc[:] = [x for x in sc if x["location"] != f["location"]]
166 # Delete "default" from workflowobj
168 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
172 find_defaults(workflowobj, visit_default)
175 def discover_default_secondary_files(obj):
176 discover_secondary_files(obj["inputs"],
177 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
180 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
182 for d in list(discovered):
183 # Only interested in discovered secondaryFiles which are local
184 # files that need to be uploaded.
185 if d.startswith("file:"):
186 sc.extend(discovered[d])
190 mapper = ArvPathMapper(arvrunner, sc, "",
194 single_collection=True)
197 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
198 p["location"] = mapper.mapper(p["location"]).resolved
200 visit_class(workflowobj, ("File", "Directory"), setloc)
201 visit_class(discovered, ("File", "Directory"), setloc)
203 if discovered_secondaryfiles is not None:
205 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
207 if "$schemas" in workflowobj:
209 for s in workflowobj["$schemas"]:
210 sch.append(mapper.mapper(s).resolved)
211 workflowobj["$schemas"] = sch
216 def upload_docker(arvrunner, tool):
217 """Uploads Docker images used in CommandLineTool objects."""
219 if isinstance(tool, CommandLineTool):
220 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
222 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
223 # TODO: can be supported by containers API, but not jobs API.
224 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
225 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
226 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
228 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
229 elif isinstance(tool, cwltool.workflow.Workflow):
231 upload_docker(arvrunner, s.embedded_tool)
234 def packed_workflow(arvrunner, tool, merged_map):
235 """Create a packed workflow.
237 A "packed" workflow is one where all the components have been combined into a single document."""
240 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
241 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
243 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
245 def visit(v, cur_id):
246 if isinstance(v, dict):
247 if v.get("class") in ("CommandLineTool", "Workflow"):
249 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
250 cur_id = rewrite_to_orig.get(v["id"], v["id"])
251 if "location" in v and not v["location"].startswith("keep:"):
252 v["location"] = merged_map[cur_id].resolved[v["location"]]
253 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
254 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
255 if v.get("class") == "DockerRequirement":
256 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
259 if isinstance(v, list):
266 def tag_git_version(packed):
267 if tool.tool["id"].startswith("file://"):
268 path = os.path.dirname(tool.tool["id"][7:])
270 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
271 except (OSError, subprocess.CalledProcessError):
274 packed["http://schema.org/version"] = githash
277 def upload_job_order(arvrunner, name, tool, job_order):
278 """Upload local files referenced in the input object and return updated input
279 object with 'location' updated to the proper keep references.
282 discover_secondary_files(tool.tool["inputs"], job_order)
284 jobmapper = upload_dependencies(arvrunner,
288 job_order.get("id", "#"),
291 if "id" in job_order:
294 # Need to filter this out, gets added by cwltool when providing
295 # parameters on the command line.
296 if "job_order" in job_order:
297 del job_order["job_order"]
301 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
303 def upload_workflow_deps(arvrunner, tool):
304 # Ensure that Docker images needed by this workflow are available
306 upload_docker(arvrunner, tool)
308 document_loader = tool.doc_loader
312 def upload_tool_deps(deptool):
314 discovered_secondaryfiles = {}
315 pm = upload_dependencies(arvrunner,
316 "%s dependencies" % (shortname(deptool["id"])),
321 include_primary=False,
322 discovered_secondaryfiles=discovered_secondaryfiles)
323 document_loader.idx[deptool["id"]] = deptool
325 for k,v in pm.items():
326 toolmap[k] = v.resolved
327 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
329 tool.visit(upload_tool_deps)
333 def arvados_jobs_image(arvrunner, img):
334 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
337 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
338 except Exception as e:
339 raise Exception("Docker image %s is not available\n%s" % (img, e) )
342 def upload_workflow_collection(arvrunner, name, packed):
343 collection = arvados.collection.Collection(api_client=arvrunner.api,
344 keep_client=arvrunner.keep_client,
345 num_retries=arvrunner.num_retries)
346 with collection.open("workflow.cwl", "w") as f:
347 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
349 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
350 ["name", "like", name+"%"]]
351 if arvrunner.project_uuid:
352 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
353 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
356 logger.info("Using collection %s", exists["items"][0]["uuid"])
358 collection.save_new(name=name,
359 owner_uuid=arvrunner.project_uuid,
360 ensure_unique_name=True,
361 num_retries=arvrunner.num_retries)
362 logger.info("Uploaded to %s", collection.manifest_locator())
364 return collection.portable_data_hash()
367 class Runner(Process):
368 """Base class for runner processes, which submit an instance of
369 arvados-cwl-runner and wait for the final result."""
371 def __init__(self, runner, tool, loadingContext, enable_reuse,
372 output_name, output_tags, submit_runner_ram=0,
373 name=None, on_error=None, submit_runner_image=None,
374 intermediate_output_ttl=0, merged_map=None,
375 priority=None, secret_store=None,
376 collection_cache_size=256,
377 collection_cache_is_default=True):
379 super(Runner, self).__init__(tool.tool, loadingContext)
381 self.arvrunner = runner
382 self.embedded_tool = tool
383 self.job_order = None
386 # If reuse is permitted by command line arguments but
387 # disabled by the workflow itself, disable it.
388 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
390 enable_reuse = reuse_req["enableReuse"]
391 self.enable_reuse = enable_reuse
393 self.final_output = None
394 self.output_name = output_name
395 self.output_tags = output_tags
397 self.on_error = on_error
398 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
399 self.intermediate_output_ttl = intermediate_output_ttl
400 self.priority = priority
401 self.secret_store = secret_store
403 self.submit_runner_cores = 1
404 self.submit_runner_ram = 1024 # defaut 1 GiB
405 self.collection_cache_size = collection_cache_size
407 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
408 if runner_resource_req:
409 if runner_resource_req.get("coresMin"):
410 self.submit_runner_cores = runner_resource_req["coresMin"]
411 if runner_resource_req.get("ramMin"):
412 self.submit_runner_ram = runner_resource_req["ramMin"]
413 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
414 self.collection_cache_size = runner_resource_req["keep_cache"]
416 if submit_runner_ram:
417 # Command line / initializer overrides default and/or spec from workflow
418 self.submit_runner_ram = submit_runner_ram
420 if self.submit_runner_ram <= 0:
421 raise Exception("Value of submit-runner-ram must be greater than zero")
423 if self.submit_runner_cores <= 0:
424 raise Exception("Value of submit-runner-cores must be greater than zero")
426 self.merged_map = merged_map or {}
429 job_order, # type: Mapping[Text, Text]
430 output_callbacks, # type: Callable[[Any, Any], Any]
431 runtimeContext # type: RuntimeContext
432 ): # type: (...) -> Generator[Any, None, None]
433 self.job_order = job_order
434 self._init_job(job_order, runtimeContext)
437 def update_pipeline_component(self, record):
440 def done(self, record):
441 """Base method for handling a completed runner."""
444 if record["state"] == "Complete":
445 if record.get("exit_code") is not None:
446 if record["exit_code"] == 33:
447 processStatus = "UnsupportedRequirement"
448 elif record["exit_code"] == 0:
449 processStatus = "success"
451 processStatus = "permanentFail"
453 processStatus = "success"
455 processStatus = "permanentFail"
459 if processStatus == "permanentFail":
460 logc = arvados.collection.CollectionReader(record["log"],
461 api_client=self.arvrunner.api,
462 keep_client=self.arvrunner.keep_client,
463 num_retries=self.arvrunner.num_retries)
464 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
466 self.final_output = record["output"]
467 outc = arvados.collection.CollectionReader(self.final_output,
468 api_client=self.arvrunner.api,
469 keep_client=self.arvrunner.keep_client,
470 num_retries=self.arvrunner.num_retries)
471 if "cwl.output.json" in outc:
472 with outc.open("cwl.output.json", "rb") as f:
474 outputs = json.loads(f.read().decode())
475 def keepify(fileobj):
476 path = fileobj["location"]
477 if not path.startswith("keep:"):
478 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
479 adjustFileObjs(outputs, keepify)
480 adjustDirObjs(outputs, keepify)
482 logger.exception("[%s] While getting final output object", self.name)
483 self.arvrunner.output_callback({}, "permanentFail")
485 self.arvrunner.output_callback(outputs, processStatus)