Merge branch '21666-provision-test-improvement'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 1544d05cd70660c6e046ef80073b7c80fb7c52c2..259294a36e6ccbf87df87c6124eda124aef03d0b 100644 (file)
@@ -2,11 +2,6 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from future import standard_library
-standard_library.install_aliases()
-from future.utils import  viewvalues, viewitems
-from past.builtins import basestring
-
 import os
 import sys
 import re
@@ -42,10 +37,7 @@ from cwltool.utils import (
     CWLOutputType,
 )
 
-if os.name == "posix" and sys.version_info[0] < 3:
-    import subprocess32 as subprocess
-else:
-    import subprocess
+import subprocess
 
 from schema_salad.sourceline import SourceLine, cmap
 
@@ -53,13 +45,14 @@ from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import Builder
 import schema_salad.validate as validate
+import schema_salad.ref_resolver
 
 import arvados.collection
 import arvados.util
@@ -68,12 +61,13 @@ from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
 from .perf import Perf
 
+basestring = (bytes, str)
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
@@ -105,7 +99,7 @@ def find_defaults(d, op):
         if "default" in d:
             op(d)
         else:
-            for i in viewvalues(d):
+            for i in d.values():
                 find_defaults(i, op)
 
 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
@@ -294,7 +288,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run, runtimeContext,
+                        workflowobj, uri, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None,
                         cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
@@ -302,64 +296,27 @@ def upload_dependencies(arvrunner, name, document_loader,
     Returns a pathmapper object mapping local paths to keep references.  Also
     does an in-place update of references in "workflowobj".
 
-    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    Use scandeps to find $schemas, File and Directory
     fields that represent external references.
 
     If workflowobj has an "id" field, this will reload the document to ensure
     it is scanning the raw document prior to preprocessing.
     """
 
-    loaded = set()
-    def loadref(b, u):
-        joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urllib.parse.urldefrag(joined)
-        if defrg not in loaded:
-            loaded.add(defrg)
-            if cache is not None and defrg in cache:
-                return cache[defrg]
-            # Use fetch_text to get raw file (before preprocessing).
-            text = document_loader.fetch_text(defrg)
-            if isinstance(text, bytes):
-                textIO = StringIO(text.decode('utf-8'))
-            else:
-                textIO = StringIO(text)
-            yamlloader = YAML(typ='safe', pure=True)
-            result = yamlloader.load(textIO)
-            if cache is not None:
-                cache[defrg] = result
-            return result
-        else:
-            return {}
-
-    if loadref_run:
-        loadref_fields = set(("$import", "run"))
-    else:
-        loadref_fields = set(("$import",))
-
     scanobj = workflowobj
-    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
-        if cache is not None and defrg not in cache:
-            # if we haven't seen this file before, want raw file
-            # content (before preprocessing) to ensure that external
-            # references like $include haven't already been inlined.
-            scanobj = loadref("", workflowobj["id"])
-
     metadata = scanobj
 
-    with Perf(metrics, "scandeps include, location"):
+    with Perf(metrics, "scandeps"):
         sc_result = scandeps(uri, scanobj,
-                             loadref_fields,
-                             set(("$include", "location")),
-                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             set(),
+                             set(("location",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
                              nestdirs=False)
-
-    with Perf(metrics, "scandeps $schemas"):
         optional_deps = scandeps(uri, scanobj,
-                                      loadref_fields,
-                                      set(("$schemas",)),
-                                      loadref, urljoin=document_loader.fetcher.urljoin,
-                                      nestdirs=False)
+                             set(),
+                             set(("$schemas",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
     if sc_result is None:
         sc_result = []
@@ -483,15 +440,18 @@ def upload_dependencies(arvrunner, name, document_loader,
                                single_collection=True,
                                optional_deps=optional_deps)
 
+    for k, v in uuid_map.items():
+        mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
+
     keeprefs = set()
     def addkeepref(k):
         if k.startswith("keep:"):
             keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
-    def setloc(p):
+
+    def collectloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
-            p["location"] = mapper.mapper(p["location"]).resolved
             addkeepref(p["location"])
             return
 
@@ -522,12 +482,10 @@ def upload_dependencies(arvrunner, name, document_loader,
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
                 "Collection uuid %s not found" % uuid)
-        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
-        p[collectionUUID] = uuid
 
-    with Perf(metrics, "setloc"):
-        visit_class(workflowobj, ("File", "Directory"), setloc)
-        visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "collectloc"):
+        visit_class(workflowobj, ("File", "Directory"), collectloc)
+        visit_class(discovered, ("File", "Directory"), collectloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -551,6 +509,7 @@ def upload_dependencies(arvrunner, name, document_loader,
                 logger.warning("Cannot find collection with portable data hash %s", kr)
                 continue
             col = col["items"][0]
+            col["name"] = arvados.util.trim_name(col["name"])
             try:
                 arvrunner.api.collections().create(body={"collection": {
                     "owner_uuid": runtimeContext.project_uuid,
@@ -563,7 +522,7 @@ def upload_dependencies(arvrunner, name, document_loader,
                     "trash_at": col["trash_at"]
                 }}, ensure_unique_name=True).execute()
             except Exception as e:
-                logger.warning("Unable copy collection to destination: %s", e)
+                logger.warning("Unable to copy collection to destination: %s", e)
 
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
@@ -585,20 +544,10 @@ def upload_docker(arvrunner, tool, runtimeContext):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+                                                       True, runtimeContext)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
@@ -614,7 +563,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
                   rewrite_out=rewrites,
                   loader=tool.doc_loader)
 
-    rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
+    rewrite_to_orig = {v: k for k,v in rewrites.items()}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
@@ -633,11 +582,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             runtimeContext.project_uuid,
-                                                                                                             runtimeContext.force_docker_pull,
-                                                                                                             runtimeContext.tmp_outdir_prefix,
-                                                                                                             runtimeContext.match_local_docker,
-                                                                                                             runtimeContext.copy_deps)
+                                                                                                             runtimeContext)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -662,6 +607,73 @@ def tag_git_version(packed):
         else:
             packed["http://schema.org/version"] = githash
 
+def setloc(mapper, p):
+    loc = p.get("location")
+    if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+        p["location"] = mapper.mapper(p["location"]).resolved
+        return
+
+    if not loc:
+        return
+
+    if collectionUUID in p:
+        uuid = p[collectionUUID]
+        keepuuid = "keep:"+uuid
+        if keepuuid not in mapper:
+            raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        gp = collection_pdh_pattern.match(loc)
+        if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
+            # This file entry has both collectionUUID and a PDH
+            # location. If the PDH doesn't match the one returned
+            # the API server, raise an error.
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Expected collection uuid %s to be %s but API server reported %s" % (
+                    uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
+
+    gp = collection_uuid_pattern.match(loc)
+    if not gp:
+        # Not a uuid pattern (must be a pdh pattern)
+        return
+
+    uuid = gp.groups()[0]
+    keepuuid = "keep:"+uuid
+    if keepuuid not in mapper:
+        raise SourceLine(p, "location", validate.ValidationException).makeError(
+            "Collection uuid %s not found" % uuid)
+    p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
+    p[collectionUUID] = uuid
+
+def update_from_mapper(workflowobj, mapper):
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
+
+def apply_merged_map(merged_map, workflowobj):
+    def visit(v, cur_id):
+        if isinstance(v, dict):
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if "id" in v:
+                    cur_id = v["id"]
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            #if v.get("class") == "DockerRequirement":
+            #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+            #                                                                                                 runtimeContext)
+            for l in v:
+                visit(v[l], cur_id)
+        if isinstance(v, list):
+            for l in v:
+                visit(l, cur_id)
+    visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+    tool.visit(partial(apply_merged_map, merged_map))
 
 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
@@ -694,12 +706,14 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
                              tool.tool["inputs"],
                              job_order)
 
+    _jobloaderctx = jobloaderctx.copy()
+    jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
     jobmapper = upload_dependencies(arvrunner,
                                     name,
-                                    tool.doc_loader,
+                                    jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False,
                                     runtimeContext)
 
     if "id" in job_order:
@@ -710,7 +724,9 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     if "job_order" in job_order:
         del job_order["job_order"]
 
-    return job_order
+    update_from_mapper(job_order, jobmapper)
+
+    return job_order, jobmapper
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
@@ -724,28 +740,38 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
 
     merged_map = {}
     tool_dep_cache = {}
+
+    todo = []
+
+    # Standard traversal is top down, we want to go bottom up, so use
+    # the visitor to accumalate a list of nodes to visit, then
+    # visit them in reverse order.
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            discovered_secondaryfiles = {}
-            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
-                pm = upload_dependencies(arvrunner,
-                                         "%s dependencies" % (shortname(deptool["id"])),
-                                         document_loader,
-                                         deptool,
-                                         deptool["id"],
-                                         False,
-                                         runtimeContext,
-                                         include_primary=False,
-                                         discovered_secondaryfiles=discovered_secondaryfiles,
-                                         cache=tool_dep_cache)
-            document_loader.idx[deptool["id"]] = deptool
-            toolmap = {}
-            for k,v in pm.items():
-                toolmap[k] = v.resolved
-            merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+            todo.append(deptool)
 
     tool.visit(upload_tool_deps)
 
+    for deptool in reversed(todo):
+        discovered_secondaryfiles = {}
+        with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+            pm = upload_dependencies(arvrunner,
+                                     "%s dependencies" % (shortname(deptool["id"])),
+                                     document_loader,
+                                     deptool,
+                                     deptool["id"],
+                                     runtimeContext,
+                                     include_primary=False,
+                                     discovered_secondaryfiles=discovered_secondaryfiles,
+                                     cache=tool_dep_cache)
+
+        document_loader.idx[deptool["id"]] = deptool
+        toolmap = {}
+        for k,v in pm.items():
+            toolmap[k] = v.resolved
+
+        merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
     return merged_map
 
 def arvados_jobs_image(arvrunner, img, runtimeContext):
@@ -753,12 +779,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
 
     try:
         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
-                                                          True,
-                                                          runtimeContext.project_uuid,
-                                                          runtimeContext.force_docker_pull,
-                                                          runtimeContext.tmp_outdir_prefix,
-                                                          runtimeContext.match_local_docker,
-                                                          runtimeContext.copy_deps)
+                                                          True, runtimeContext)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
@@ -792,7 +813,7 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, updated_tool,
+    def __init__(self, runner,
                  tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
@@ -800,12 +821,12 @@ class Runner(Process):
                  priority=None, secret_store=None,
                  collection_cache_size=256,
                  collection_cache_is_default=True,
-                 git_info=None):
+                 git_info=None,
+                 reuse_runner=False):
 
-        loadingContext = loadingContext.copy()
-        loadingContext.metadata = updated_tool.metadata.copy()
+        self.loadingContext = loadingContext.copy()
 
-        super(Runner, self).__init__(updated_tool.tool, loadingContext)
+        super(Runner, self).__init__(tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool
@@ -817,6 +838,9 @@ class Runner(Process):
             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
+            reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
+            if reuse_req:
+                enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
         self.uuid = None
         self.final_output = None
@@ -828,8 +852,10 @@ class Runner(Process):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
-        self.enable_dev = loadingContext.enable_dev
+        self.enable_dev = self.loadingContext.enable_dev
         self.git_info = git_info
+        self.fast_parser = self.loadingContext.fast_parser
+        self.reuse_runner = reuse_runner
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -892,7 +918,8 @@ class Runner(Process):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
+                             include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
 
             self.final_output = record["output"]
             outc = arvados.collection.CollectionReader(self.final_output,
@@ -914,3 +941,42 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
+    def collect_locators(obj):
+        loc = obj.get("location", "")
+
+        g = arvados.util.keepuri_pattern.match(loc)
+        if g:
+            references.add(g[1])
+
+        if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
+            references.add(obj["acrContainerImage"])
+
+        if obj.get("class") == "DockerRequirement":
+            references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
+
+    sc_result = scandeps(tool["id"], tool,
+                         set(),
+                         set(("location", "id")),
+                         None, urljoin=doc_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    visit_class(sc_result, ("File", "Directory"), collect_locators)
+    visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
+
+
+def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
+    references = set()
+
+    tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
+
+    for mm in merged_map:
+        for k, v in merged_map[mm].resolved.items():
+            g = arvados.util.keepuri_pattern.match(v)
+            if g:
+                references.add(g[1])
+
+    json.dump(sorted(references), arvRunner.stdout)
+    print(file=arvRunner.stdout)