Merge branch '10847-nodemanager-shutdown' refs #10847
authorPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:30:29 +0000 (21:30 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:30:29 +0000 (21:30 -0400)
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/test/fake_driver.py
services/nodemanager/arvnodeman/timedcallback.py
services/nodemanager/tests/integration_test.py

index 54d6a82bcefa1cf38ad06f58cbbf89fafe55ecd1..20b274b1587f6acbf276b193dda89370956a44ec 100644 (file)
@@ -82,8 +82,11 @@ class RetryMixin(object):
                             raise
 
                         self._logger.warning(
-                            "Client error: %s - waiting %s seconds",
-                            error, self.retry_wait, exc_info=error)
+                            "Client error: %s - %s %s seconds",
+                            error,
+                            "scheduling retry in" if self._timer else "sleeping",
+                            self.retry_wait,
+                            exc_info=error)
 
                         if self._timer:
                             start_time = time.time()
index c0413f626c0fe9ad3173af784e4f315baaaa289f..7e63c782ede1fecee931d088505aed549a21c9df 100644 (file)
@@ -517,6 +517,16 @@ class NodeManagerDaemonActor(actor_class):
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
+
+        # Shut down pollers
+        self._server_wishlist_actor.stop()
+        self._arvados_nodes_actor.stop()
+        self._cloud_nodes_actor.stop()
+
+        # Clear cloud node list
+        self.update_cloud_nodes([])
+
+        # Stop setup actors unless they are in the middle of setup.
         setup_stops = {key: node.stop_if_no_cloud_node()
                        for key, node in self.booting.iteritems()}
         self.booting = {key: self.booting[key]
index ee49305664444396ee027f738d9598be33caef4d..1785e0559ee8bd74a9045907088d54a19e55e946 100644 (file)
@@ -84,3 +84,16 @@ class QuotaDriver(FakeDriver):
         if len(all_nodes) == 0:
             quota = 4
         return True
+
+class FailingDriver(FakeDriver):
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        raise Exception("nope")
index 12d6280873e8fe23669bbf6f1dce08a952bfcda2..c020a7ef033cf645f77675123f22dfb1378963ab 100644 (file)
@@ -29,9 +29,9 @@ class TimedCallBackActor(actor_class):
 
     def deliver(self):
         if not self.messages:
-            return None
+            return
         til_next = self.messages[0][0] - time.time()
-        if til_next < 0:
+        if til_next <= 0:
             t, receiver, args, kwargs = heapq.heappop(self.messages)
             try:
                 receiver(*args, **kwargs)
index 78233659a59b0aa978dcce35ba4e15cc9189bdf3..6bd7fd10323cefa7621f446e03866009efbf9162 100755 (executable)
@@ -19,8 +19,19 @@ import tempfile
 import shutil
 from functools import partial
 import arvados
+import StringIO
 
-logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger("logger")
+logger.setLevel(logging.INFO)
+logger.addHandler(logging.StreamHandler(sys.stderr))
+
+detail = logging.getLogger("detail")
+detail.setLevel(logging.INFO)
+if os.environ.get("ANMTEST_LOGLEVEL"):
+    detail_content = sys.stderr
+else:
+    detail_content = StringIO.StringIO()
+detail.addHandler(logging.StreamHandler(detail_content))
 
 fake_slurm = None
 compute_nodes = None
@@ -31,7 +42,7 @@ def update_script(path, val):
         f.write(val)
     os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
     os.rename(path+"_", path)
-    logging.info("Update script %s: %s", path, val)
+    detail.info("Update script %s: %s", path, val)
 
 def set_squeue(g):
     global all_jobs
@@ -106,11 +117,11 @@ def run_test(name, actions, checks, driver_class, jobs):
     for n in api.nodes().list().execute()['items']:
         api.nodes().delete(uuid=n["uuid"]).execute()
 
-    logging.info("Start %s", name)
+    logger.info("Start %s", name)
 
     global fake_slurm
     fake_slurm = tempfile.mkdtemp()
-    logging.info("fake_slurm is %s", fake_slurm)
+    detail.info("fake_slurm is %s", fake_slurm)
 
     global compute_nodes
     compute_nodes = {}
@@ -127,8 +138,7 @@ def run_test(name, actions, checks, driver_class, jobs):
 
     # Write configuration file for test
     with open("tests/fake.cfg.template") as f:
-        with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
-            pass
+        open(os.path.join(fake_slurm, "id_rsa.pub"), "w").close()
         with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
             cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
                                       token=os.environ["ARVADOS_API_TOKEN"],
@@ -154,15 +164,15 @@ def run_test(name, actions, checks, driver_class, jobs):
         # naive line iteration over pipes gets buffered, which isn't what we want,
         # see https://bugs.python.org/issue3907
         for line in iter(p.stderr.readline, ""):
-            sys.stdout.write(line)
+            detail_content.write(line)
 
             for k,v in checks.items():
                 g = re.match(k, line)
                 if g:
-                    logging.info("Matched check %s", k)
+                    detail.info("Matched check %s", k)
                     code += v(checks, k, g)
                     if code != 0:
-                        logging.error("Check failed")
+                        detail.error("Check failed")
                         if not terminated:
                             p.terminate()
                             terminated = True
@@ -171,7 +181,7 @@ def run_test(name, actions, checks, driver_class, jobs):
                 continue
 
             if time.time() > timeout:
-                logging.error("Exceeded timeout with actions remaining: %s", actions)
+                detail.error("Exceeded timeout with actions remaining: %s", actions)
                 code += 1
                 if not terminated:
                     p.terminate()
@@ -180,11 +190,11 @@ def run_test(name, actions, checks, driver_class, jobs):
             k, v = actions[0]
             g = re.match(k, line)
             if g:
-                logging.info("Matched action %s", k)
+                detail.info("Matched action %s", k)
                 actions.pop(0)
                 code += v(g)
                 if code != 0:
-                    logging.error("Action failed")
+                    detail.error("Action failed")
                     p.terminate()
                     terminated = True
 
@@ -195,15 +205,17 @@ def run_test(name, actions, checks, driver_class, jobs):
         p.kill()
 
     if actions:
-        logging.error("Ended with remaining actions: %s", actions)
+        detail.error("Ended with remaining actions: %s", actions)
         code = 1
 
     shutil.rmtree(fake_slurm)
 
     if code == 0:
-        logging.info("%s passed", name)
+        logger.info("%s passed", name)
     else:
-        logging.info("%s failed", name)
+        if isinstance(detail_content, StringIO()):
+            sys.stderr.write(detail_content.getvalue())
+        logger.info("%s failed", name)
 
     return code
 
@@ -304,6 +316,21 @@ def main():
              "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test_no_hang_failing_node_create": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Client error: nope", noop),
+                (r".*Client error: nope", noop),
+                (r".*Client error: nope", noop),
+                (r".*Client error: nope", noop),
+            ],
+            {},
+            "arvnodeman.test.fake_driver.FailingDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
          })
     }
 
@@ -315,9 +342,9 @@ def main():
             code += run_test(t, *tests[t])
 
     if code == 0:
-        logging.info("Tests passed")
+        logger.info("Tests passed")
     else:
-        logging.info("Tests failed")
+        logger.info("Tests failed")
 
     exit(code)