import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
sc_result = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "location")),
+ set(("$include", "location")),
loadref, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ sc_result.extend(optional_deps)
+
sc = []
uuids = {}
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
- if "$schemas" in workflowobj:
- for s in workflowobj["$schemas"]:
- sc.append({"class": "File", "location": s})
-
def visit_default(obj):
- remove = [False]
- def ensure_default_location(f):
+ def defaults_are_optional(f):
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
- if "location" in f and not arvrunner.fs_access.exists(f["location"]):
- # Doesn't exist, remove from list of dependencies to upload
- sc[:] = [x for x in sc if x["location"] != f["location"]]
- # Delete "default" from workflowobj
- remove[0] = True
- visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
- if remove[0]:
- del obj["default"]
+ normalizeFilesDirs(f)
+ optional_deps.append(f)
+ visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
find_defaults(workflowobj, visit_default)
"keep:%s",
"keep:%s/%s",
name=name,
- single_collection=True)
+ single_collection=True,
+ optional_deps=optional_deps)
+
+ keeprefs = set()
+ def addkeepref(k):
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if arvrunner.runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", arvrunner.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": arvrunner.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )