set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
return
+ if inputschema == "File":
+ inputschema = {"type": "File"}
+
if isinstance(inputschema, basestring):
sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
if sd:
set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
elif (inputschema["type"] == "File" and
- secondaryspec and
isinstance(primary, Mapping) and
- primary.get("class") == "File" and
- "secondaryFiles" not in primary):
+ primary.get("class") == "File"):
+
+ if "secondaryFiles" in primary or not secondaryspec:
+ # Nothing to do.
+ return
+
#
# Found a file, check for secondaryFiles
#
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
if builder.cwlVersion == "v1.0":
- pattern = builder.do_eval(sf, context=primary)
+ pattern = sf
else:
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ pattern = sf["pattern"]
if pattern is None:
continue
if isinstance(pattern, list):
"Expression must return list, object, string or null")
if pattern is not None:
- sfpath = substitute(primary["location"], pattern)
+ if "${" in pattern or "$(" in pattern:
+ sfname = builder.do_eval(pattern, context=primary)
+ else:
+ sfname = substitute(primary["basename"], pattern)
+
+ if sfname is None:
+ continue
+
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
discovered[primary["location"]] = primary["secondaryFiles"]
- elif inputschema["type"] not in primitive_types_set:
+ elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
keeprefs = set()
def addkeepref(k):
- keeprefs.add(collection_pdh_pattern.match(k).group(1))
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
def setloc(p):
loc = p.get("location")
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
- if arvrunner.runtimeContext.copy_deps:
+ if runtimeContext.copy_deps:
# Find referenced collections and copy them into the
# destination project, for easy sharing.
already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
filters=[["portable_data_hash", "in", list(keeprefs)],
- ["owner_uuid", "=", arvrunner.project_uuid]],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
select=["uuid", "portable_data_hash", "created_at"]))
keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
col = col["items"][0]
try:
arvrunner.api.collections().create(body={"collection": {
- "owner_uuid": arvrunner.project_uuid,
+ "owner_uuid": runtimeContext.project_uuid,
"name": col["name"],
"description": col["description"],
"properties": col["properties"],
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())