Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 80986a94d7811c6dee8288c60ed14d25e4bc6fa8..259294a36e6ccbf87df87c6124eda124aef03d0b 100644 (file)
@@ -2,11 +2,6 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from future import standard_library
-standard_library.install_aliases()
-from future.utils import  viewvalues, viewitems
-from past.builtins import basestring
-
 import os
 import sys
 import re
@@ -42,10 +37,7 @@ from cwltool.utils import (
     CWLOutputType,
 )
 
-if os.name == "posix" and sys.version_info[0] < 3:
-    import subprocess32 as subprocess
-else:
-    import subprocess
+import subprocess
 
 from schema_salad.sourceline import SourceLine, cmap
 
@@ -75,6 +67,7 @@ from . import done
 from . context import ArvRuntimeContext
 from .perf import Perf
 
+basestring = (bytes, str)
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
@@ -106,7 +99,7 @@ def find_defaults(d, op):
         if "default" in d:
             op(d)
         else:
-            for i in viewvalues(d):
+            for i in d.values():
                 find_defaults(i, op)
 
 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
@@ -570,7 +563,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
                   rewrite_out=rewrites,
                   loader=tool.doc_loader)
 
-    rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
+    rewrite_to_orig = {v: k for k,v in rewrites.items()}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
@@ -589,11 +582,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             runtimeContext.project_uuid,
-                                                                                                             runtimeContext.force_docker_pull,
-                                                                                                             runtimeContext.tmp_outdir_prefix,
-                                                                                                             runtimeContext.match_local_docker,
-                                                                                                             runtimeContext.copy_deps)
+                                                                                                             runtimeContext)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -659,6 +648,33 @@ def update_from_mapper(workflowobj, mapper):
     with Perf(metrics, "setloc"):
         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
 
+def apply_merged_map(merged_map, workflowobj):
+    def visit(v, cur_id):
+        if isinstance(v, dict):
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if "id" in v:
+                    cur_id = v["id"]
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            #if v.get("class") == "DockerRequirement":
+            #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+            #                                                                                                 runtimeContext)
+            for l in v:
+                visit(v[l], cur_id)
+        if isinstance(v, list):
+            for l in v:
+                visit(l, cur_id)
+    visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+    tool.visit(partial(apply_merged_map, merged_map))
+
 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
@@ -710,16 +726,13 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
 
     update_from_mapper(job_order, jobmapper)
 
-    #print(json.dumps(job_order, indent=2))
-
-    return job_order
+    return job_order, jobmapper
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    # commented out for testing only, uncomment me
     with Perf(metrics, "upload_docker"):
         upload_docker(arvrunner, tool, runtimeContext)
 
@@ -756,6 +769,7 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
         toolmap = {}
         for k,v in pm.items():
             toolmap[k] = v.resolved
+
         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
 
     return merged_map
@@ -799,7 +813,7 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, updated_tool,
+    def __init__(self, runner,
                  tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
@@ -807,12 +821,12 @@ class Runner(Process):
                  priority=None, secret_store=None,
                  collection_cache_size=256,
                  collection_cache_is_default=True,
-                 git_info=None):
+                 git_info=None,
+                 reuse_runner=False):
 
         self.loadingContext = loadingContext.copy()
-        self.loadingContext.metadata = updated_tool.metadata.copy()
 
-        super(Runner, self).__init__(updated_tool.tool, loadingContext)
+        super(Runner, self).__init__(tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool
@@ -841,6 +855,7 @@ class Runner(Process):
         self.enable_dev = self.loadingContext.enable_dev
         self.git_info = git_info
         self.fast_parser = self.loadingContext.fast_parser
+        self.reuse_runner = reuse_runner
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -903,7 +918,8 @@ class Runner(Process):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
+                             include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
 
             self.final_output = record["output"]
             outc = arvados.collection.CollectionReader(self.final_output,
@@ -925,3 +941,42 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
+    def collect_locators(obj):
+        loc = obj.get("location", "")
+
+        g = arvados.util.keepuri_pattern.match(loc)
+        if g:
+            references.add(g[1])
+
+        if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
+            references.add(obj["acrContainerImage"])
+
+        if obj.get("class") == "DockerRequirement":
+            references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
+
+    sc_result = scandeps(tool["id"], tool,
+                         set(),
+                         set(("location", "id")),
+                         None, urljoin=doc_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    visit_class(sc_result, ("File", "Directory"), collect_locators)
+    visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
+
+
+def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
+    references = set()
+
+    tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
+
+    for mm in merged_map:
+        for k, v in merged_map[mm].resolved.items():
+            g = arvados.util.keepuri_pattern.match(v)
+            if g:
+                references.add(g[1])
+
+    json.dump(sorted(references), arvRunner.stdout)
+    print(file=arvRunner.stdout)