19070: Add --copy-deps/--no-copy-deps
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 42d4b552acc92bc8d76a3ffbbf32edc539682aad..6b670c73df4e6d58872193897223429337b4402f 100644 (file)
@@ -39,8 +39,9 @@ from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
-import ruamel.yaml as yaml
+from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
@@ -105,7 +106,8 @@ def make_builder(joborder, hints, requirements, runtimeContext, metadata):
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
-                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+                 container_engine="docker"
                 )
 
 def search_schemadef(name, reqs):
@@ -183,7 +185,10 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             elif isinstance(pattern, dict):
                 specs.append(pattern)
             elif isinstance(pattern, str):
-                specs.append({"pattern": pattern})
+                if builder.cwlVersion == "v1.0":
+                    specs.append({"pattern": pattern, "required": True})
+                else:
+                    specs.append({"pattern": pattern, "required": sf.get("required")})
             else:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Expression must return list, object, string or null")
@@ -192,7 +197,12 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         for i, sf in enumerate(specs):
             if isinstance(sf, dict):
                 if sf.get("class") == "File":
-                    pattern = sf["basename"]
+                    pattern = None
+                    if sf.get("location") is None:
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "File object is missing 'location': %s" % sf)
+                    sfpath = sf["location"]
+                    required = True
                 else:
                     pattern = sf["pattern"]
                     required = sf.get("required")
@@ -203,11 +213,16 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Expression must return list, object, string or null")
 
-            sfpath = substitute(primary["location"], pattern)
+            if pattern is not None:
+                sfpath = substitute(primary["location"], pattern)
+
             required = builder.do_eval(required, context=primary)
 
             if fsaccess.exists(sfpath):
-                found.append({"location": sfpath, "class": "File"})
+                if pattern is not None:
+                    found.append({"location": sfpath, "class": "File"})
+                else:
+                    found.append(sf)
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
@@ -251,7 +266,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            return yamlloader.load(textIO)
         else:
             return {}
 
@@ -269,9 +285,18 @@ def upload_dependencies(arvrunner, name, document_loader,
     metadata = scanobj
 
     sc_result = scandeps(uri, scanobj,
-                  loadref_fields,
-                  set(("$include", "$schemas", "location")),
-                  loadref, urljoin=document_loader.fetcher.urljoin)
+                         loadref_fields,
+                         set(("$include", "location")),
+                         loadref, urljoin=document_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -329,24 +354,14 @@ def upload_dependencies(arvrunner, name, document_loader,
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            normalizeFilesDirs(f)
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
@@ -382,12 +397,18 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
+
+    keeprefs = set()
+    def addkeepref(k):
+        keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
@@ -409,7 +430,10 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -424,10 +448,43 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if arvrunner.runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", arvrunner.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": arvrunner.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
-            sch.append(mapper.mapper(s).resolved)
+            if s in mapper:
+                sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
 
     return mapper
@@ -440,17 +497,21 @@ def upload_docker(arvrunner, tool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
                                                        arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker,
+                                                       arvrunner.runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
                                                        True, arvrunner.project_uuid,
                                                        arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker,
+                                                       arvrunner.runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -470,7 +531,7 @@ def packed_workflow(arvrunner, tool, merged_map):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
-            if v.get("class") in ("CommandLineTool", "Workflow"):
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
                 if "id" in v:
@@ -478,15 +539,18 @@ def packed_workflow(arvrunner, tool, merged_map):
             if "path" in v and "location" not in v:
                 v["location"] = v["path"]
                 del v["path"]
-            if "location" in v and not v["location"].startswith("keep:"):
-                v["location"] = merged_map[cur_id].resolved[v["location"]]
-            if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
-                v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
                                                                                                              arvrunner.project_uuid,
                                                                                                              arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                                                                             arvrunner.runtimeContext.match_local_docker,
+                                                                                                             arvrunner.runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -593,7 +657,9 @@ def arvados_jobs_image(arvrunner, img):
     try:
         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
                                                           arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                          arvrunner.runtimeContext.match_local_docker,
+                                                          arvrunner.runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )