from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
from cwltool.builder import Builder
import schema_salad.validate as validate
+import schema_salad.ref_resolver
import arvados.collection
import arvados.util
if sfname is None:
continue
- p_location = primary["location"]
- if "/" in p_location:
- sfpath = (
- p_location[0 : p_location.rindex("/") + 1]
- + sfname
- )
+ if isinstance(sfname, str):
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
- if fsaccess.exists(sfpath):
- if pattern is not None:
- found.append({"location": sfpath, "class": "File"})
- else:
- found.append(sf)
- elif required:
- raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
- "Required secondary file '%s' does not exist" % sfpath)
+ if isinstance(sfname, list) or isinstance(sfname, dict):
+ each = aslist(sfname)
+ for e in each:
+ if required and not fsaccess.exists(e.get("location")):
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % e.get("location"))
+ found.extend(each)
+
+ if isinstance(sfname, str):
+ if fsaccess.exists(sfpath):
+ if pattern is not None:
+ found.append({"location": sfpath, "class": "File"})
+ else:
+ found.append(sf)
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
scanobj = workflowobj
if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- # Need raw file content (before preprocessing) to ensure
- # that external references in $include and $mixin are captured.
- scanobj = loadref("", workflowobj["id"])
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if cache is not None and defrg not in cache:
+ # if we haven't seen this file before, want raw file
+ # content (before preprocessing) to ensure that external
+ # references like $include haven't already been inlined.
+ scanobj = loadref("", workflowobj["id"])
metadata = scanobj
normalizeFilesDirs(sc)
- if include_primary and "id" in workflowobj:
- sc.append({"class": "File", "location": workflowobj["id"]})
+ if "id" in workflowobj:
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if include_primary:
+ # make sure it's included
+ sc.append({"class": "File", "location": defrg})
+ else:
+ # make sure it's excluded
+ sc = [d for d in sc if d.get("location") != defrg]
def visit_default(obj):
def defaults_are_optional(f):
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
for l in v:
visit(l, cur_id)
visit(packed, None)
+
+ if git_info:
+ for g in git_info:
+ packed[g] = git_info[g]
+
return packed
tool.tool["inputs"],
job_order)
+ _jobloaderctx = jobloaderctx.copy()
+ jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
jobmapper = upload_dependencies(arvrunner,
name,
- tool.doc_loader,
+ jobloader,
job_order,
job_order.get("id", "#"),
False,
merged_map = {}
tool_dep_cache = {}
+
+ todo = []
+
+ # Standard traversal is top down, we want to go bottom up, so use
+ # the visitor to accumalate a list of nodes to visit, then
+ # visit them in reverse order.
def upload_tool_deps(deptool):
if "id" in deptool:
- discovered_secondaryfiles = {}
- with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles,
- cache=tool_dep_cache)
- document_loader.idx[deptool["id"]] = deptool
- toolmap = {}
- for k,v in pm.items():
- toolmap[k] = v.resolved
- merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+ todo.append(deptool)
tool.visit(upload_tool_deps)
+ for deptool in reversed(todo):
+ discovered_secondaryfiles = {}
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
+ document_loader.idx[deptool["id"]] = deptool
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
return merged_map
def arvados_jobs_image(arvrunner, img, runtimeContext):
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
- collection_cache_is_default=True):
+ collection_cache_is_default=True,
+ git_info=None):
loadingContext = loadingContext.copy()
loadingContext.metadata = updated_tool.metadata.copy()
self.priority = priority
self.secret_store = secret_store
self.enable_dev = loadingContext.enable_dev
+ self.git_info = git_info
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
-
-
-
-
-# --- from cwltool ---
-
-
-CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
-
-
-def scandeps(
- base: str,
- doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
- reffields: Set[str],
- urlfields: Set[str],
- loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
- urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
- nestdirs: bool = True,
- do_normalize: bool = True,
-) -> Optional[MutableSequence[CWLObjectType]]:
-
- """Given a CWL document or input object, search for dependencies
- (references to external files) of 'doc' and return them as a list
- of File or Directory objects.
-
- The 'base' is the base URL for relative references.
-
- Looks for objects with 'class: File' or 'class: Directory' and
- adds them to the list of dependencies.
-
- Anything in 'urlfields' is also added as a File dependency.
-
- Anything in 'reffields' (such as workflow step 'run') will be
- added as a dependency and also loaded (using the 'loadref'
- function) and recursively scanned for dependencies. Those
- dependencies will be added as secondary files to the primary file.
-
- If "nestdirs" is true, create intermediate directory objects when
- a file is located in a subdirectory under the starting directory.
- This is so that if the dependencies are materialized, they will
- produce the same relative file system locations.
-
- """
-
- r: Optional[MutableSequence[CWLObjectType]] = None
- if isinstance(doc, MutableMapping):
- if "id" in doc:
- if cast(str, doc["id"]).startswith("file://"):
- df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
- if base != df:
- if r is None:
- r = []
- r.append({"class": "File", "location": df, "format": CWL_IANA})
- base = df
-
- if doc.get("class") in ("File", "Directory") and "location" in urlfields:
- u = cast(Optional[str], doc.get("location", doc.get("path")))
- if u and not u.startswith("_:"):
- deps = {
- "class": doc["class"],
- "location": urljoin(base, u),
- } # type: CWLObjectType
- if "basename" in doc:
- deps["basename"] = doc["basename"]
- if doc["class"] == "Directory" and "listing" in doc:
- deps["listing"] = doc["listing"]
- if doc["class"] == "File" and "secondaryFiles" in doc:
- sd = scandeps(
- base,
- cast(
- Union[CWLObjectType, MutableSequence[CWLObjectType]],
- doc["secondaryFiles"],
- ),
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sd:
- deps["secondaryFiles"] = cast(
- CWLOutputAtomType,
- sd
- )
- if nestdirs:
- deps = nestdir(base, deps)
- if r is None:
- r = []
- r.append(deps)
- else:
- if doc["class"] == "Directory" and "listing" in doc:
- sd = scandeps(
- base,
- cast(MutableSequence[CWLObjectType], doc["listing"]),
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sd:
- if r is None:
- r = []
- r.extend(sd)
- elif doc["class"] == "File" and "secondaryFiles" in doc:
- sd = scandeps(
- base,
- cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sd:
- if r is None:
- r = sd
- else:
- r.extend(sd)
-
- for k, v in doc.items():
- if k in reffields:
- for u2 in aslist(v):
- if isinstance(u2, MutableMapping):
- sd = scandeps(
- base,
- u2,
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sd:
- if r is None:
- r = sd
- else:
- r.extend(sd)
- else:
- subid = urljoin(base, u2)
- basedf, _ = urllib.parse.urldefrag(base)
- subiddf, _ = urllib.parse.urldefrag(subid)
- if basedf == subiddf:
- continue
- sub = cast(
- Union[MutableSequence[CWLObjectType], CWLObjectType],
- loadref(base, u2),
- )
- deps2 = {
- "class": "File",
- "location": subid,
- "format": CWL_IANA,
- } # type: CWLObjectType
- sf = scandeps(
- subid,
- sub,
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sf:
- deps2["secondaryFiles"] = cast(
- MutableSequence[CWLOutputAtomType], mergedirs(sf)
- )
- if nestdirs:
- deps2 = nestdir(base, deps2)
- if r is None:
- r = []
- r.append(deps2)
- elif k in urlfields and k != "location":
- for u3 in aslist(v):
- deps = {"class": "File", "location": urljoin(base, u3)}
- if nestdirs:
- deps = nestdir(base, deps)
- if r is None:
- r = []
- r.append(deps)
- elif doc.get("class") in ("File", "Directory") and k in (
- "listing",
- "secondaryFiles",
- ):
- # should be handled earlier.
- pass
- else:
- sd = scandeps(
- base,
- cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if sd:
- if r is None:
- r = sd
- else:
- r.extend(sd)
- elif isinstance(doc, MutableSequence):
- for d in doc:
- sd = scandeps(
- base,
- d,
- reffields,
- urlfields,
- loadref,
- urljoin=urljoin,
- nestdirs=nestdirs,
- do_normalize=False,
- )
- if r is None:
- r = sd
- else:
- r.extend(sd)
-
- if r and do_normalize:
- normalizeFilesDirs(r)
-
- return r