13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 29c0535d93c8a0ff164af01949ae152f056591d7..9f6390e3073e63d6482d404ec52d7330c6ddd40c 100644 (file)
@@ -1,22 +1,24 @@
+from future import standard_library
+standard_library.install_aliases()
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
 
 import os
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
 
 import os
-import urlparse
+import urllib.parse
 from functools import partial
 import logging
 import json
 import subprocess32 as subprocess
 from collections import namedtuple
 
 from functools import partial
 import logging
 import json
 import subprocess32 as subprocess
 from collections import namedtuple
 
-from StringIO import StringIO
+from io import StringIO
 
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
@@ -26,7 +28,7 @@ from cwltool.pack import pack
 import arvados.collection
 import ruamel.yaml as yaml
 
 import arvados.collection
 import ruamel.yaml as yaml
 
-from .arvdocker import arv_docker_get_image
+import arvados_cwl.arvdocker
 from .pathmapper import ArvPathMapper, trim_listing
 from ._version import __version__
 from . import done
 from .pathmapper import ArvPathMapper, trim_listing
 from ._version import __version__
 from . import done
@@ -61,7 +63,7 @@ def find_defaults(d, op):
         if "default" in d:
             op(d)
         else:
         if "default" in d:
             op(d)
         else:
-            for i in d.itervalues():
+            for i in d.values():
                 find_defaults(i, op)
 
 def setSecondary(t, fileobj, discovered):
                 find_defaults(i, op)
 
 def setSecondary(t, fileobj, discovered):
@@ -98,7 +100,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     loaded = set()
     def loadref(b, u):
         joined = document_loader.fetcher.urljoin(b, u)
     loaded = set()
     def loadref(b, u):
         joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urlparse.urldefrag(joined)
+        defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
             # Use fetch_text to get raw file (before preprocessing).
         if defrg not in loaded:
             loaded.add(defrg)
             # Use fetch_text to get raw file (before preprocessing).
@@ -131,7 +133,8 @@ def upload_dependencies(arvrunner, name, document_loader,
     def only_real(obj):
         # Only interested in local files than need to be uploaded,
         # don't include file literals, keep references, etc.
     def only_real(obj):
         # Only interested in local files than need to be uploaded,
         # don't include file literals, keep references, etc.
-        if obj.get("location", "").startswith("file:"):
+        sp = obj.get("location", "").split(":")
+        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
             sc.append(obj)
 
     visit_class(sc_result, ("File", "Directory"), only_real)
             sc.append(obj)
 
     visit_class(sc_result, ("File", "Directory"), only_real)
@@ -214,9 +217,9 @@ def upload_docker(arvrunner, tool):
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
         else:
         else:
-            arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -231,7 +234,7 @@ def packed_workflow(arvrunner, tool, merged_map):
     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
 
     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
 
-    rewrite_to_orig = {v: k for k,v in rewrites.items()}
+    rewrite_to_orig = {v: k for k,v in list(rewrites.items())}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
@@ -243,6 +246,8 @@ def packed_workflow(arvrunner, tool, merged_map):
                 v["location"] = merged_map[cur_id].resolved[v["location"]]
             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
                 v["location"] = merged_map[cur_id].resolved[v["location"]]
             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if v.get("class") == "DockerRequirement":
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -311,7 +316,7 @@ def upload_workflow_deps(arvrunner, tool):
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
-            for k,v in pm.items():
+            for k,v in list(pm.items()):
                 toolmap[k] = v.resolved
             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
 
                 toolmap[k] = v.resolved
             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
 
@@ -323,10 +328,10 @@ def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
-    return img
+
 
 def upload_workflow_collection(arvrunner, name, packed):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
 
 def upload_workflow_collection(arvrunner, name, packed):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
@@ -353,23 +358,28 @@ def upload_workflow_collection(arvrunner, name, packed):
     return collection.portable_data_hash()
 
 
     return collection.portable_data_hash()
 
 
-class Runner(object):
+class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, job_order, enable_reuse,
+    def __init__(self, runner, tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
-                 priority=None, secret_store=None):
+                 priority=None, secret_store=None,
+                 collection_cache_size=256,
+                 collection_cache_is_default=True):
+
+        super(Runner, self).__init__(tool.tool, loadingContext)
+
         self.arvrunner = runner
         self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
+        self.embedded_tool = tool
+        self.job_order = None
         self.running = False
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
         self.running = False
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
-            reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
@@ -386,13 +396,16 @@ class Runner(object):
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
+        self.collection_cache_size = collection_cache_size
 
 
-        runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
         if runner_resource_req:
             if runner_resource_req.get("coresMin"):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
             if runner_resource_req.get("ramMin"):
                 self.submit_runner_ram = runner_resource_req["ramMin"]
         if runner_resource_req:
             if runner_resource_req.get("coresMin"):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
             if runner_resource_req.get("ramMin"):
                 self.submit_runner_ram = runner_resource_req["ramMin"]
+            if runner_resource_req.get("keep_cache") and collection_cache_is_default:
+                self.collection_cache_size = runner_resource_req["keep_cache"]
 
         if submit_runner_ram:
             # Command line / initializer overrides default and/or spec from workflow
 
         if submit_runner_ram:
             # Command line / initializer overrides default and/or spec from workflow
@@ -406,6 +419,15 @@ class Runner(object):
 
         self.merged_map = merged_map or {}
 
 
         self.merged_map = merged_map or {}
 
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Any, None, None]
+        self.job_order = job_order
+        self._init_job(job_order, runtimeContext)
+        yield self
+
     def update_pipeline_component(self, record):
         pass
 
     def update_pipeline_component(self, record):
         pass
 
@@ -441,7 +463,7 @@ class Runner(object):
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
             if "cwl.output.json" in outc:
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
             if "cwl.output.json" in outc:
-                with outc.open("cwl.output.json") as f:
+                with outc.open("cwl.output.json", "rb") as f:
                     if f.size() > 0:
                         outputs = json.load(f)
             def keepify(fileobj):
                     if f.size() > 0:
                         outputs = json.load(f)
             def keepify(fileobj):