self.project_uuid = runtimeContext.project_uuid
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order, runtimeContext)
+ with Perf(metrics, "upload_job_order"):
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
+ loadingContext.disable_js_validation = True
if submitting:
loadingContext.do_update = False
# Document may have been auto-updated. Reload the original
# document with updating disabled because we want to
# submit the document with its original CWL version, not
# the auto-updated one.
- tool = load_tool(updated_tool.tool["id"], loadingContext)
+ with Perf(metrics, "load_tool original"):
+ tool = load_tool(updated_tool.tool["id"], loadingContext)
else:
tool = updated_tool
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ logger.info("Uploading workflow dependencies")
+ with Perf(metrics, "upload_workflow_deps"):
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- tool = load_tool(tool.tool, loadingContext)
+ with Perf(metrics, "load_tool"):
+ tool = load_tool(tool.tool, loadingContext)
if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
import copy
from collections import namedtuple
from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Sequence,
+ MutableSequence,
+ Optional,
+ Set,
+ Sized,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+from cwltool.utils import (
+ CWLObjectType,
+ CWLOutputAtomType,
+ CWLOutputType,
+)
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
def trim_anonymous_location(obj):
"""Remove 'location' field from File and Directory literals.
if sfname is None:
continue
- p_location = primary["location"]
- if "/" in p_location:
- sfpath = (
- p_location[0 : p_location.rindex("/") + 1]
- + sfname
- )
+ if isinstance(sfname, str):
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
- if fsaccess.exists(sfpath):
- if pattern is not None:
- found.append({"location": sfpath, "class": "File"})
- else:
- found.append(sf)
- elif required:
- raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
- "Required secondary file '%s' does not exist" % sfpath)
+ if isinstance(sfname, list) or isinstance(sfname, dict):
+ each = aslist(sfname)
+ for e in each:
+ if required and not fsaccess.exists(e.get("location")):
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % e.get("location"))
+ found.extend(each)
+
+ if isinstance(sfname, str):
+ if fsaccess.exists(sfpath):
+ if pattern is not None:
+ found.append({"location": sfpath, "class": "File"})
+ else:
+ found.append(sf)
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run, runtimeContext,
- include_primary=True, discovered_secondaryfiles=None):
+ include_primary=True, discovered_secondaryfiles=None,
+ cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
defrg, _ = urllib.parse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
+ if cache is not None and defrg in cache:
+ return cache[defrg]
# Use fetch_text to get raw file (before preprocessing).
text = document_loader.fetch_text(defrg)
if isinstance(text, bytes):
else:
textIO = StringIO(text)
yamlloader = YAML(typ='safe', pure=True)
- return yamlloader.load(textIO)
+ result = yamlloader.load(textIO)
+ if cache is not None:
+ cache[defrg] = result
+ return result
else:
return {}
scanobj = workflowobj
if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- # Need raw file content (before preprocessing) to ensure
- # that external references in $include and $mixin are captured.
- scanobj = loadref("", workflowobj["id"])
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if cache is not None and defrg not in cache:
+ # if we haven't seen this file before, want raw file
+ # content (before preprocessing) to ensure that external
+ # references like $include haven't already been inlined.
+ scanobj = loadref("", workflowobj["id"])
metadata = scanobj
- sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps include, location"):
+ sc_result = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps $schemas"):
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- sc_result.extend(optional_deps)
+ if sc_result is None:
+ sc_result = []
+
+ if optional_deps is None:
+ optional_deps = []
+
+ if optional_deps:
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
sc.append(obj)
collect_uuids(obj)
- visit_class(workflowobj, ("File", "Directory"), collect_uuids)
- visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ with Perf(metrics, "collect uuids"):
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+ with Perf(metrics, "collect uploads"):
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
# Resolve any collection uuids we found to portable data hashes
# and assign them to uuid_map
uuid_map = {}
fetch_uuids = list(uuids.keys())
- while fetch_uuids:
- # For a large number of fetch_uuids, API server may limit
- # response size, so keep fetching from API server has nothing
- # more to give us.
- lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", fetch_uuids]],
- count="none",
- select=["uuid", "portable_data_hash"]).execute(
- num_retries=arvrunner.num_retries)
+ with Perf(metrics, "fetch_uuids"):
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
- if not lookups["items"]:
- break
+ if not lookups["items"]:
+ break
- for l in lookups["items"]:
- uuid_map[l["uuid"]] = l["portable_data_hash"]
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
- fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
- if include_primary and "id" in workflowobj:
- sc.append({"class": "File", "location": workflowobj["id"]})
+ if "id" in workflowobj:
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if include_primary:
+ # make sure it's included
+ sc.append({"class": "File", "location": defrg})
+ else:
+ # make sure it's excluded
+ sc = [d for d in sc if d.get("location") != defrg]
def visit_default(obj):
def defaults_are_optional(f):
else:
del discovered[d]
- mapper = ArvPathMapper(arvrunner, sc, "",
- "keep:%s",
- "keep:%s/%s",
- name=name,
- single_collection=True,
- optional_deps=optional_deps)
+ with Perf(metrics, "mapper"):
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name,
+ single_collection=True,
+ optional_deps=optional_deps)
keeprefs = set()
def addkeepref(k):
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), setloc)
+ visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool, runtimeContext)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
merged_map = {}
-
+ tool_dep_cache = {}
def upload_tool_deps(deptool):
if "id" in deptool:
discovered_secondaryfiles = {}
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles)
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():