15028: More conformance fixes
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index 31ac4c2478dccc5dcf60775c8f1e28db5ec4632d..9ff06fa4aa23b3f9f661566f24ffa56db6b0d47d 100644 (file)
@@ -2,6 +2,12 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from __future__ import division
+from builtins import next
+from builtins import object
+from builtins import str
+from future.utils import viewvalues, viewitems
+
 import argparse
 import logging
 import os
@@ -39,6 +45,7 @@ from ._version import __version__
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
 from cwltool.command_line_tool import compute_checksums
+from cwltool.load_tool import load_tool
 
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
@@ -53,6 +60,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
     def __init__(self, runtime_status_update_func):
         super(RuntimeStatusLoggingHandler, self).__init__()
         self.runtime_status_update = runtime_status_update_func
+        self.updatingRuntimeStatus = False
 
     def emit(self, record):
         kind = None
@@ -60,22 +68,27 @@ class RuntimeStatusLoggingHandler(logging.Handler):
             kind = 'error'
         elif record.levelno >= logging.WARNING:
             kind = 'warning'
-        if kind is not None:
-            log_msg = record.getMessage()
-            if '\n' in log_msg:
-                # If the logged message is multi-line, use its first line as status
-                # and the rest as detail.
-                status, detail = log_msg.split('\n', 1)
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, status),
-                    detail
-                )
-            else:
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, record.getMessage())
-                )
+        if kind is not None and self.updatingRuntimeStatus is not True:
+            self.updatingRuntimeStatus = True
+            try:
+                log_msg = record.getMessage()
+                if '\n' in log_msg:
+                    # If the logged message is multi-line, use its first line as status
+                    # and the rest as detail.
+                    status, detail = log_msg.split('\n', 1)
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, status),
+                        detail
+                    )
+                else:
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, record.getMessage())
+                    )
+            finally:
+                self.updatingRuntimeStatus = False
+
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -157,7 +170,7 @@ class ArvCwlExecutor(object):
                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
 
         if self.work_api == "jobs":
-            logger.warn("""
+            logger.warning("""
 *******************************
 Using the deprecated 'jobs' API.
 
@@ -180,6 +193,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         # if running inside a container
         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
             root_logger = logging.getLogger('')
+
+            # Remove existing RuntimeStatusLoggingHandlers if they exist
+            handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
+            root_logger.handlers = handlers
+
             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
             root_logger.addHandler(handler)
 
@@ -332,7 +350,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 if self.stop_polling.is_set():
                     break
                 with self.workflow_eval_lock:
-                    keys = list(self.processes.keys())
+                    keys = list(self.processes)
                 if not keys:
                     remain_wait = self.poll_interval
                     continue
@@ -347,11 +365,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
                 while keys:
                     page = keys[:pageSize]
-                    keys = keys[pageSize:]
                     try:
                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
-                    except Exception as e:
-                        logger.warn("Error checking states on API server: %s", e)
+                    except Exception:
+                        logger.exception("Error checking states on API server: %s")
                         remain_wait = self.poll_interval
                         continue
 
@@ -363,6 +380,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                 "new_attributes": p
                             }
                         })
+                    keys = keys[pageSize:]
+
                 finish_poll = time.time()
                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
@@ -382,12 +401,12 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         for i in self.intermediate_output_collections:
             try:
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
-            except:
-                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+            except Exception:
+                logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            except (KeyboardInterrupt, SystemExit):
                 break
 
-    def check_features(self, obj):
+    def check_features(self, obj, parentfield=""):
         if isinstance(obj, dict):
             if obj.get("writable") and self.work_api != "containers":
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
@@ -401,12 +420,15 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                             "Option 'dockerOutputDirectory' must be an absolute path.")
             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
-            for v in obj.itervalues():
-                self.check_features(v)
+            if obj.get("class") == "InplaceUpdateRequirement":
+                if obj["inplaceUpdate"] and parentfield == "requirements":
+                    raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
+            for k,v in viewitems(obj):
+                self.check_features(v, parentfield=k)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
-                    self.check_features(v)
+                    self.check_features(v, parentfield=parentfield)
 
     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
@@ -425,17 +447,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               num_retries=self.num_retries)
 
         for k,v in generatemapper.items():
-            if k.startswith("_:"):
-                if v.type == "Directory":
+            if v.type == "Directory" and v.resolved.startswith("_:"):
                     continue
-                if v.type == "CreateFile":
-                    with final.open(v.target, "wb") as f:
-                        f.write(v.resolved.encode("utf-8"))
+            if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
+                with final.open(v.target, "wb") as f:
+                    f.write(v.resolved.encode("utf-8"))
                     continue
 
-            if not k.startswith("keep:"):
+            if not v.resolved.startswith("keep:"):
                 raise Exception("Output source is not in keep or a literal")
-            sp = k.split("/")
+            sp = v.resolved.split("/")
             srccollection = sp[0][5:]
             try:
                 reader = self.collection_cache.get(srccollection)
@@ -445,7 +466,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
                 raise
             except IOError as e:
-                logger.warn("While preparing output collection: %s", e)
+                logger.error("While preparing output collection: %s", e)
+                raise
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
@@ -457,7 +479,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         adjustFileObjs(outputObj, rewrite)
 
         with final.open("cwl.output.json", "w") as f:
-            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+            res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
+            f.write(res)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
@@ -494,8 +517,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               body={
                                                   'is_trashed': True
                                               }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Setting container output: %s", e)
+            except Exception:
+                logger.exception("Setting container output")
+                return
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
                                    body={
@@ -504,6 +528,18 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                        'progress':1.0
                                    }).execute(num_retries=self.num_retries)
 
+    def apply_reqs(self, job_order_object, tool):
+        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+            if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1.0-dev1 or greater and re-run with "
+                    "--enable-dev.")
+            job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
+            for req in job_reqs:
+                tool.requirements.append(req)
+
     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
         self.debug = runtimeContext.debug
 
@@ -530,25 +566,38 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if not runtimeContext.name:
             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
+        # Upload local file references in the job order.
+        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+                                     tool, job_order)
+
+        submitting = (runtimeContext.update_workflow or
+                      runtimeContext.create_workflow or
+                      (runtimeContext.submit and not
+                       (tool.tool["class"] == "CommandLineTool" and
+                        runtimeContext.wait and
+                        not runtimeContext.always_submit_runner)))
+
+        loadingContext = self.loadingContext.copy()
+        loadingContext.do_validate = False
+        loadingContext.do_update = False
+        if submitting:
+            # Document may have been auto-updated. Reload the original
+            # document with updating disabled because we want to
+            # submit the original document, not the auto-updated one.
+            tool = load_tool(tool.tool["id"], loadingContext)
+
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
         merged_map = upload_workflow_deps(self, tool)
 
-        # Reload tool object which may have been updated by
-        # upload_workflow_deps
-        # Don't validate this time because it will just print redundant errors.
-        loadingContext = self.loadingContext.copy()
+        # Recreate process object (ArvadosWorkflow or
+        # ArvadosCommandTool) because tool document may have been
+        # updated by upload_workflow_deps in ways that modify
+        # inheritance of hints or requirements.
         loadingContext.loader = tool.doc_loader
         loadingContext.avsc_names = tool.doc_schema
         loadingContext.metadata = tool.metadata
-        loadingContext.do_validate = False
-
-        tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
-                                  loadingContext)
-
-        # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     tool, job_order)
+        tool = load_tool(tool.tool, loadingContext)
 
         existing_uuid = runtimeContext.update_workflow
         if existing_uuid or runtimeContext.create_workflow:
@@ -573,6 +622,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                         merged_map=merged_map),
                         "success")
 
+        self.apply_reqs(job_order, tool)
+
         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
         self.eval_timeout = runtimeContext.eval_timeout
 
@@ -608,7 +659,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                         visited.add(m.group(1))
                         estimated_size[0] += int(m.group(2))
             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
-            runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
+            runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
 
         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
@@ -719,8 +770,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         except:
             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 logger.error("Interrupted, workflow will be cancelled")
+            elif isinstance(sys.exc_info()[1], WorkflowException):
+                logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             else:
-                logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.exception("Workflow execution failed")
+
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)