14886: Fixes mock issue between tests
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index 535cfd7582b985ad806d659f518f9da9ce0e6fbc..319e8a887114b88b55865ca673dbafb3e0b9a7dc 100644 (file)
@@ -59,6 +59,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
     def __init__(self, runtime_status_update_func):
         super(RuntimeStatusLoggingHandler, self).__init__()
         self.runtime_status_update = runtime_status_update_func
+        self.updatingRuntimeStatus = False
 
     def emit(self, record):
         kind = None
@@ -66,22 +67,27 @@ class RuntimeStatusLoggingHandler(logging.Handler):
             kind = 'error'
         elif record.levelno >= logging.WARNING:
             kind = 'warning'
-        if kind is not None:
-            log_msg = record.getMessage()
-            if '\n' in log_msg:
-                # If the logged message is multi-line, use its first line as status
-                # and the rest as detail.
-                status, detail = log_msg.split('\n', 1)
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, status),
-                    detail
-                )
-            else:
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, record.getMessage())
-                )
+        if kind is not None and self.updatingRuntimeStatus is not True:
+            self.updatingRuntimeStatus = True
+            try:
+                log_msg = record.getMessage()
+                if '\n' in log_msg:
+                    # If the logged message is multi-line, use its first line as status
+                    # and the rest as detail.
+                    status, detail = log_msg.split('\n', 1)
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, status),
+                        detail
+                    )
+                else:
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, record.getMessage())
+                    )
+            finally:
+                self.updatingRuntimeStatus = False
+            
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -361,8 +367,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     keys = keys[pageSize:]
                     try:
                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
-                    except Exception as e:
-                        logger.warning("Error checking states on API server: %s", e)
+                    except Exception:
+                        logger.exception("Error checking states on API server: %s")
                         remain_wait = self.poll_interval
                         continue
 
@@ -393,9 +399,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         for i in self.intermediate_output_collections:
             try:
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
-            except:
+            except Exception:
                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+            except (KeyboardInterrupt, SystemExit):
                 break
 
     def check_features(self, obj):
@@ -506,8 +512,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               body={
                                                   'is_trashed': True
                                               }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Setting container output: %s", e)
+            except Exception:
+                logger.exception("Setting container output")
+                return
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
                                    body={
@@ -731,8 +738,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         except:
             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 logger.error("Interrupted, workflow will be cancelled")
+            elif isinstance(sys.exc_info()[1], WorkflowException):
+                logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             else:
-                logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.exception("Workflow execution failed")
+
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)