Merge branch '10312-nodemanager-quotas' refs #10312
authorPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:29:59 +0000 (21:29 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:29:59 +0000 (21:29 -0400)
18 files changed:
build/run-tests.sh
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/arvnodeman/test/__init__.py [new file with mode: 0644]
services/nodemanager/arvnodeman/test/fake_driver.py [new file with mode: 0644]
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/tests/fake.cfg.template [new file with mode: 0644]
services/nodemanager/tests/integration_test.py [new file with mode: 0755]
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py

index dffbe32e8ab9c89d6cecf02dd445c8314d2560ad..352d05b945ea168fb0700614c9327bfd1e3fa033 100755 (executable)
@@ -78,6 +78,7 @@ services/keepstore
 services/keep-balance
 services/login-sync
 services/nodemanager
+services/nodemanager-integration
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
@@ -854,6 +855,12 @@ test_login-sync() {
 }
 do_test services/login-sync login-sync
 
+test_nodemanager-integration() {
+    cd "$WORKSPACE/services/nodemanager" \
+        && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+}
+do_test services/nodemanager-integration nodemanager-integration
+
 for p in "${pythonstuff[@]}"
 do
     dir=${p%:py3}
index 9ee26e336d2849b163d02ca77fdac39012afd7c1..63dac3f0edd0b4938b55c945001e07ceb270da25 100644 (file)
@@ -5,6 +5,7 @@ from __future__ import absolute_import, print_function
 import functools
 import logging
 import time
+import re
 
 import libcloud.common.types as cloud_types
 import pykka
@@ -16,6 +17,8 @@ from ...clientactor import _notify_subscribers
 from ... import config
 from .transitions import transitions
 
+QuotaExceeded = "QuotaExceeded"
+
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
@@ -96,6 +99,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
+        self.error = None
         if arvados_node is None:
             self._later.create_arvados_node()
         else:
@@ -119,11 +123,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     def create_cloud_node(self):
         self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
+        try:
+            self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                      self.arvados_node)
+        except Exception as e:
+            # The set of possible error codes / messages isn't documented for
+            # all clouds, so use a keyword heuristic to determine if the
+            # failure is likely due to a quota.
+            if re.search(r'(exceed|quota|limit)', e.message, re.I):
+                self.error = QuotaExceeded
+                self._logger.warning("Quota exceeded: %s", e)
+                self._finished()
+                return
+            else:
+                raise
 
         # The information included in the node size object we get from libcloud
-        # is inconsistent between cloud providers.  Replace libcloud NodeSize
+        # is inconsistent between cloud drivers.  Replace libcloud NodeSize
         # object with compatible CloudSizeWrapper object which merges the size
         # info reported from the cloud with size information from the
         # configuration file.
index 29b04845b653190f88c306094486ac030add3af7..442034170a4ffee4473eadb6c439e43f5e7e00d3 100644 (file)
@@ -25,7 +25,7 @@ class BaseComputeNodeDriver(RetryMixin):
     Subclasses must implement arvados_create_kwargs, sync_node,
     node_fqdn, and node_start_time.
     """
-    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError, BaseHTTPError)
 
     @RetryMixin._retry()
     def _create_driver(self, driver_class, **auth_kwargs):
@@ -211,11 +211,6 @@ class BaseComputeNodeDriver(RetryMixin):
         # libcloud compute drivers typically raise bare Exceptions to
         # represent API errors.  Return True for any exception that is
         # exactly an Exception, or a better-known higher-level exception.
-        if (type(exception) is BaseHTTPError and
-            exception.message and
-            (exception.message.startswith("InvalidInstanceID.NotFound") or
-             exception.message.startswith("InstanceLimitExceeded"))):
-            return True
         return (isinstance(exception, cls.CLOUD_ERRORS) or
                 type(exception) is Exception)
 
index 30e8995baa26bdcc22154570dfa099aa5658d341..f884295e37c7556976ce35ba186954936cc22ed4 100644 (file)
@@ -36,7 +36,10 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
         ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
         for sec_name, settings in {
             'Arvados': {'insecure': 'no',
-                        'timeout': '15'},
+                        'timeout': '15',
+                        'jobs_queue': 'yes',
+                        'slurm_queue': 'yes'
+                    },
             'Daemon': {'min_nodes': '0',
                        'max_nodes': '1',
                        'poll_time': '60',
@@ -50,7 +53,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
             'Manage': {'address': '127.0.0.1',
                        'port': '-1'},
             'Logging': {'file': '/dev/stderr',
-                        'level': 'WARNING'},
+                        'level': 'WARNING'}
         }.iteritems():
             if not self.has_section(sec_name):
                 self.add_section(sec_name)
@@ -103,12 +106,19 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
     def new_cloud_client(self):
         module = importlib.import_module('arvnodeman.computenode.driver.' +
                                          self.get('Cloud', 'provider'))
+        driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER
+        if self.has_option('Cloud', 'driver_class'):
+            d = self.get('Cloud', 'driver_class').split('.')
+            mod = '.'.join(d[:-1])
+            cls = d[-1]
+            driver_class = importlib.import_module(mod).__dict__[cls]
         auth_kwargs = self.get_section('Cloud Credentials')
         if 'timeout' in auth_kwargs:
             auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
         return module.ComputeNodeDriver(auth_kwargs,
                                         self.get_section('Cloud List'),
-                                        self.get_section('Cloud Create'))
+                                        self.get_section('Cloud Create'),
+                                        driver_class=driver_class)
 
     def node_sizes(self, all_sizes):
         """Finds all acceptable NodeSizes for our installation.
index 9bfee79b59bae21968064b995e5cd87df7d7c7b9..c0413f626c0fe9ad3173af784e4f315baaaa289f 100644 (file)
@@ -122,6 +122,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_cloud_size = self.server_calculator.cheapest_size()
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
+        self.node_quota = max_nodes
         self.max_total_price = max_total_price
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
@@ -145,8 +146,8 @@ class NodeManagerDaemonActor(actor_class):
         self.last_polls[poll_key] = time.time()
 
     def _pair_nodes(self, node_record, arvados_node):
-        self._logger.info("Cloud node %s is now paired with Arvados node %s",
-                          node_record.cloud_node.name, arvados_node['uuid'])
+        self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
+                          node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
         self._arvados_nodes_actor.subscribe_to(
             arvados_node['uuid'], node_record.actor.update_arvados_node)
         node_record.arvados_node = arvados_node
@@ -298,16 +299,17 @@ class NodeManagerDaemonActor(actor_class):
     def _nodes_wanted(self, size):
         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
         under_min = self.min_nodes - total_node_count
-        over_max = total_node_count - self.max_nodes
+        over_max = total_node_count - self.node_quota
         total_price = self._total_price()
 
         counts = self._state_counts(size)
 
         up_count = self._nodes_up(counts)
         busy_count = counts["busy"]
+        wishlist_count = self._size_wishlist(size)
 
         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
-                          self._size_wishlist(size),
+                          wishlist_count,
                           up_count,
                           counts["booting"],
                           counts["unpaired"],
@@ -321,7 +323,7 @@ class NodeManagerDaemonActor(actor_class):
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
 
-        wanted = self._size_wishlist(size) - (up_count - busy_count)
+        wanted = wishlist_count - (up_count - busy_count)
         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
             can_boot = int((self.max_total_price - total_price) / size.price)
             if can_boot == 0:
@@ -392,25 +394,48 @@ class NodeManagerDaemonActor(actor_class):
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
                 time.time())
-        new_setup.subscribe(self._later.node_up)
+        new_setup.subscribe(self._later.node_setup_finished)
         if nodes_wanted > 1:
             self._later.start_node(cloud_size)
 
     def _get_actor_attrs(self, actor, *attr_names):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
 
-    def node_up(self, setup_proxy):
+    def node_setup_finished(self, setup_proxy):
         # Called when a SetupActor has completed.
-        cloud_node, arvados_node = self._get_actor_attrs(
-            setup_proxy, 'cloud_node', 'arvados_node')
+        cloud_node, arvados_node, error = self._get_actor_attrs(
+            setup_proxy, 'cloud_node', 'arvados_node', 'error')
         setup_proxy.stop()
 
-        # If cloud_node is None then the node create wasn't
-        # successful and so there isn't anything to do.
-        if cloud_node is not None:
+        if cloud_node is None:
+            # If cloud_node is None then the node create wasn't successful.
+            if error == dispatch.QuotaExceeded:
+                # We've hit a quota limit, so adjust node_quota to stop trying to
+                # boot new nodes until the node count goes down.
+                self.node_quota = len(self.cloud_nodes)
+                self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
+        else:
             # Node creation succeeded.  Update cloud node list.
             cloud_node._nodemanager_recently_booted = True
             self._register_cloud_node(cloud_node)
+
+            # Different quota policies may in force depending on the cloud
+            # provider, account limits, and the specific mix of nodes sizes
+            # that are already created.  If we are right at the quota limit,
+            # we want to probe to see if the last quota still applies or if we
+            # are allowed to create more nodes.
+            #
+            # For example, if the quota is actually based on core count, the
+            # quota might be 20 single-core machines or 10 dual-core machines.
+            # If we previously set node_quota to 10 dual core machines, but are
+            # now booting single core machines (actual quota 20), we want to
+            # allow the quota to expand so we don't get stuck at 10 machines
+            # forever.
+            if len(self.cloud_nodes) >= self.node_quota:
+                self.node_quota = len(self.cloud_nodes)+1
+                self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
+
+        self.node_quota = min(self.node_quota, self.max_nodes)
         del self.booting[setup_proxy.actor_ref.actor_urn]
         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
 
index 0340918e736549403d99469bad52f9f9c4a93154..1716a57d3f9763ac1f8303499e6baf893888394f 100644 (file)
@@ -112,9 +112,12 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
-    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+    def __init__(self, client, timer_actor, server_calc,
+                 jobs_queue, slurm_queue, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
+        self.jobs_queue = jobs_queue
+        self.slurm_queue = slurm_queue
         self._calculator = server_calc
 
     @staticmethod
@@ -132,22 +135,27 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             return int(x)
 
     def _send_request(self):
-        # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
         queuelist = []
-        for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split("|", 4)
-            if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
-                queuelist.append({
-                    "uuid": jobname,
-                    "runtime_constraints": {
-                        "min_cores_per_node": cpu,
-                        "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                        "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                    }
-                })
-
-        queuelist.extend(self._client.jobs().queue().execute()['items'])
+        if self.slurm_queue:
+            # cpus, memory, tempory disk space, reason, job name
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            for out in squeue_out.splitlines():
+                try:
+                    cpu, ram, disk, reason, jobname = out.split("|", 4)
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                        queuelist.append({
+                            "uuid": jobname,
+                            "runtime_constraints": {
+                                "min_cores_per_node": cpu,
+                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                            }
+                        })
+                except ValueError:
+                    pass
+
+        if self.jobs_queue:
+            queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         return queuelist
 
index 11d38ecb76d22105b289d17cec5081aaa5bf3952..3cd097a628e827145c5a59157da31edf6c6b161c 100644 (file)
@@ -22,6 +22,7 @@ from .timedcallback import TimedCallBackActor
 from ._version import __version__
 
 node_daemon = None
+watchdog = None
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
@@ -86,7 +87,10 @@ def launch_pollers(config, server_calculator):
         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).tell_proxy()
+        config.getboolean('Arvados', 'jobs_queue'),
+        config.getboolean('Arvados', 'slurm_queue'),
+        poll_time, max_poll_time
+    ).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -97,6 +101,7 @@ def shutdown_signal(signal_code, frame):
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
+        watchdog.stop()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
@@ -104,7 +109,7 @@ def shutdown_signal(signal_code, frame):
         sys.exit(-signal_code)
 
 def main(args=None):
-    global node_daemon
+    global node_daemon, watchdog
     args = parse_cli(args)
     config = load_config(args.config)
 
@@ -138,7 +143,7 @@ def main(args=None):
             node_setup, node_shutdown, node_monitor,
             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
-        WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+        watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
                             cloud_node_poller.actor_ref,
                             arvados_node_poller.actor_ref,
                             job_queue_poller.actor_ref,
index 6bf1a8b4de0ff9b03215d3c739d98dea888f32f7..7bc3a5ebd2a983bb798c1fda16367ebe911ff158 100644 (file)
@@ -29,16 +29,19 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
         sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
         nodestates = {}
         for out in sinfo_out.splitlines():
-            nodename, state = out.split(" ", 2)
-            if state in ('alloc', 'alloc*',
-                         'comp',  'comp*',
-                         'mix',   'mix*',
-                         'drng',  'drng*'):
-                nodestates[nodename] = 'busy'
-            elif state == 'idle':
-                nodestates[nodename] = 'idle'
-            else:
-                nodestates[nodename] = 'down'
+            try:
+                nodename, state = out.split(" ", 2)
+                if state in ('alloc', 'alloc*',
+                             'comp',  'comp*',
+                             'mix',   'mix*',
+                             'drng',  'drng*'):
+                    nodestates[nodename] = 'busy'
+                elif state == 'idle':
+                    nodestates[nodename] = 'idle'
+                else:
+                    nodestates[nodename] = 'down'
+            except ValueError:
+                pass
 
         for n in nodelist:
             if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
diff --git a/services/nodemanager/arvnodeman/test/__init__.py b/services/nodemanager/arvnodeman/test/__init__.py
new file mode 100644 (file)
index 0000000..8b13789
--- /dev/null
@@ -0,0 +1 @@
+
diff --git a/services/nodemanager/arvnodeman/test/fake_driver.py b/services/nodemanager/arvnodeman/test/fake_driver.py
new file mode 100644 (file)
index 0000000..ee49305
--- /dev/null
@@ -0,0 +1,86 @@
+import re
+import urllib
+import ssl
+
+from libcloud.compute.base import NodeSize, Node, NodeDriver, NodeState
+from libcloud.common.exceptions import BaseHTTPError
+
+all_nodes = []
+create_calls = 0
+quota = 2
+
+class FakeDriver(NodeDriver):
+    def __init__(self, *args, **kwargs):
+        self.name = "FakeDriver"
+
+    def list_sizes(self, **kwargs):
+        return [NodeSize("Standard_D3", "Standard_D3", 3500, 200, 0, 0, self),
+                NodeSize("Standard_D4", "Standard_D4", 7000, 400, 0, 0, self)]
+
+    def list_nodes(self, **kwargs):
+        return all_nodes
+
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        global all_nodes, create_calls
+        create_calls += 1
+        n = Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
+        all_nodes.append(n)
+        ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0] + "&instance_id=" + name
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_NONE
+        f = urllib.urlopen(ping_url, "", context=ctx)
+        f.close()
+        return n
+
+    def destroy_node(self, cloud_node):
+        global all_nodes
+        all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+        return True
+
+    def get_image(self, img):
+        pass
+
+    def ex_create_tags(self, cloud_node, tags):
+        pass
+
+class QuotaDriver(FakeDriver):
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        global all_nodes, create_calls, quota
+        if len(all_nodes) >= quota:
+            raise BaseHTTPError(503, "Quota exceeded")
+        else:
+            return super(QuotaDriver, self).create_node(name=name,
+                    size=size,
+                    image=image,
+                    auth=auth,
+                    ex_storage_account=ex_storage_account,
+                    ex_customdata=ex_customdata,
+                    ex_resource_group=ex_resource_group,
+                    ex_user_name=ex_user_name,
+                    ex_tags=ex_tags,
+                    ex_network=ex_network)
+
+    def destroy_node(self, cloud_node):
+        global all_nodes, quota
+        all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+        if len(all_nodes) == 0:
+            quota = 4
+        return True
index 8d5b855cdb4370927991e0aadc4f077f920d9eaa..be1b3ab57169c9a4b210e4d35ec91c460806d29f 100644 (file)
@@ -93,6 +93,8 @@ apiclient = WARNING
 host = zyxwv.arvadosapi.com
 token = ARVADOS_TOKEN
 timeout = 15
+jobs_queue = yes   # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes  # Get work request from squeue (containers API)
 
 # Accept an untrusted SSL certificate from the API server?
 insecure = no
index d5bed57b95811795ee83529935edf897f2339b18..cbc56f69c43e2740f15d6ceff0facbe3449e3b91 100644 (file)
@@ -93,6 +93,8 @@ apiclient = WARNING
 host = zyxwv.arvadosapi.com
 token = ARVADOS_TOKEN
 timeout = 15
+jobs_queue = yes   # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes  # Get work request from squeue (containers API)
 
 # Accept an untrusted SSL certificate from the API server?
 insecure = no
index 043bb9567d04909ec848f027365e464a859b2a6b..39526e91b3848c01aecfa0db91883cf0348238db 100644 (file)
@@ -82,6 +82,8 @@ apiclient = WARNING
 host = zyxwv.arvadosapi.com
 token = ARVADOS_TOKEN
 timeout = 15
+jobs_queue = yes   # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes  # Get work request from squeue (containers API)
 
 # Accept an untrusted SSL certificate from the API server?
 insecure = no
diff --git a/services/nodemanager/tests/fake.cfg.template b/services/nodemanager/tests/fake.cfg.template
new file mode 100644 (file)
index 0000000..eacd53f
--- /dev/null
@@ -0,0 +1,188 @@
+# Azure configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
+[Daemon]
+# The dispatcher can customize the start and stop procedure for
+# cloud nodes.  For example, the SLURM dispatcher drains nodes
+# through SLURM before shutting them down.
+#dispatcher = slurm
+
+# Node Manager will ensure that there are at least this many nodes running at
+# all times.  If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type.  However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
+min_nodes = 0
+
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
+# Poll Azure nodes and Arvados for new information every N seconds.
+poll_time = 5
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down.  Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 45
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node has been running for at least this long, but it
+# isn't paired with an Arvados node, do not shut it down, but leave it alone.
+# This prevents the node manager from shutting down a node that might
+# actually be doing work, but is having temporary trouble contacting the
+# API server.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 14400
+
+# Scaling factor to be applied to nodes' available RAM size. Usually there's a
+# variable discrepancy between the advertised RAM value on cloud nodes and the
+# actual amount available.
+# If not set, this value will be set to 0.95
+node_mem_scaling = 0.95
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+#file = node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = DEBUG
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = {host}
+token = {token}
+timeout = 15
+jobs_queue = no
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = yes
+
+[Cloud]
+provider = azure
+driver_class = {driver_class}
+
+# Shutdown windows define periods of time when a node may and may not be shut
+# down.  These are windows in full minutes, separated by commas.  Counting from
+# the time the node is booted, the node WILL NOT shut down for N1 minutes; then
+# it MAY shut down for N2 minutes; then it WILL NOT shut down for N3 minutes;
+# and so on.  For example, "20, 999999" means the node may shut down between
+# the 20th and 999999th minutes of uptime.
+# Azure bills by the minute, so it makes sense to agressively shut down idle
+# nodes.  Specify at least two windows.  You can add as many as you need beyond
+# that.
+shutdown_windows = 1, 999999
+
+[Cloud Credentials]
+# Use "azure account list" with the azure CLI to get these values.
+tenant_id = 00000000-0000-0000-0000-000000000000
+subscription_id = 00000000-0000-0000-0000-000000000000
+
+# The following directions are based on
+# https://azure.microsoft.com/en-us/documentation/articles/resource-group-authenticate-service-principal/
+#
+# azure config mode arm
+# azure ad app create --name "<Your Application Display Name>" --home-page "<https://YourApplicationHomePage>" --identifier-uris "<https://YouApplicationUri>" --password <Your_Password>
+# azure ad sp create "<Application_Id>"
+# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/<subscriptionId>/
+#
+# Use <Application_Id> for "key" and the <Your_Password> for "secret"
+#
+key = 00000000-0000-0000-0000-000000000000
+secret = PASSWORD
+timeout = 60
+region = East US
+
+[Cloud List]
+# The resource group in which the compute node virtual machines will be created
+# and listed.
+ex_resource_group = ArvadosResourceGroup
+
+[Cloud Create]
+# The image id, in the form "Publisher:Offer:SKU:Version"
+image = Canonical:UbuntuServer:14.04.3-LTS:14.04.201508050
+
+# Path to a local ssh key file that will be used to provision new nodes.
+ssh_key = {ssh_key}
+
+# The account name for the admin user that will be provisioned on new nodes.
+ex_user_name = arvadosuser
+
+# The Azure storage account that will be used to store the node OS disk images.
+ex_storage_account = arvadosstorage
+
+# The virtual network the VMs will be associated with.
+ex_network = ArvadosNetwork
+
+# Optional subnet of the virtual network.
+#ex_subnet = default
+
+# Node tags
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+# the API server to ping
+ping_host = {host}
+
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use.  The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue.  You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
+# Each size section MUST define the number of cores are available in this
+# size class (since libcloud does not provide any consistent API for exposing
+# this setting).
+# You may also want to define the amount of scratch space (expressed
+# in GB) for Crunch jobs.  You can also override Microsoft's provided
+# data fields by setting them here.
+
+[Size Standard_D3]
+cores = 4
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
new file mode 100755 (executable)
index 0000000..7823365
--- /dev/null
@@ -0,0 +1,325 @@
+#!/usr/bin/env python
+"""Integration test framework for node manager.
+
+Runs full node manager with an API server (needs ARVADOS_API_HOST and
+ARVADOS_API_TOKEN).  Stubs out the cloud driver and slurm commands to mock
+specific behaviors.  Monitors the log output to verify an expected sequence of
+events or behaviors for each test.
+
+"""
+
+import subprocess
+import os
+import sys
+import re
+import time
+import logging
+import stat
+import tempfile
+import shutil
+from functools import partial
+import arvados
+
+logging.basicConfig(level=logging.INFO)
+
+fake_slurm = None
+compute_nodes = None
+all_jobs = None
+
+def update_script(path, val):
+    with open(path+"_", "w") as f:
+        f.write(val)
+    os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+    os.rename(path+"_", path)
+    logging.info("Update script %s: %s", path, val)
+
+def set_squeue(g):
+    global all_jobs
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+                  "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+    return 0
+
+
+def node_paired(g):
+    global compute_nodes
+    compute_nodes[g.group(1)] = g.group(3)
+
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+
+    for k,v in all_jobs.items():
+        if v == "ReqNodeNotAvail":
+            all_jobs[k] = "Running"
+            break
+
+    set_squeue(g)
+
+    return 0
+
+def remaining_jobs(g):
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+
+    for k,v in all_jobs.items():
+        all_jobs[k] = "Running"
+
+    set_squeue(g)
+
+    return 0
+
+
+def node_busy(g):
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+    return 0
+
+def node_shutdown(g):
+    global compute_nodes
+    del compute_nodes[g.group(1)]
+    return 0
+
+def jobs_req(g):
+    global all_jobs
+    for k,v in all_jobs.items():
+        all_jobs[k] = "ReqNodeNotAvail"
+    set_squeue(g)
+    return 0
+
+def noop(g):
+    return 0
+
+def fail(checks, pattern, g):
+    return 1
+
+def expect_count(count, checks, pattern, g):
+    if count == 0:
+        return 1
+    else:
+        checks[pattern] = partial(expect_count, count-1)
+        return 0
+
+def run_test(name, actions, checks, driver_class, jobs):
+    code = 0
+
+    # Delete any stale node records
+    api = arvados.api('v1')
+    for n in api.nodes().list().execute()['items']:
+        api.nodes().delete(uuid=n["uuid"]).execute()
+
+    logging.info("Start %s", name)
+
+    global fake_slurm
+    fake_slurm = tempfile.mkdtemp()
+    logging.info("fake_slurm is %s", fake_slurm)
+
+    global compute_nodes
+    compute_nodes = {}
+
+    global all_jobs
+    all_jobs = jobs
+
+    env = os.environ.copy()
+    env["PATH"] = fake_slurm + ":" + env["PATH"]
+
+    # Reset fake squeue/sinfo to empty
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n")
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n")
+
+    # Write configuration file for test
+    with open("tests/fake.cfg.template") as f:
+        with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
+            pass
+        with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
+            cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
+                                      token=os.environ["ARVADOS_API_TOKEN"],
+                                      driver_class=driver_class,
+                                      ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
+
+    # Tests must complete in less than 3 minutes.
+    timeout = time.time() + 180
+    terminated = False
+
+    # Now start node manager
+    p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
+                         bufsize=0, stderr=subprocess.PIPE, env=env)
+
+    # Test main loop:
+    # - Read line
+    # - Apply negative checks (thinks that are not supposed to happen)
+    # - Check timeout
+    # - Check if the next action should trigger
+    # - If all actions are exhausted, terminate with test success
+    # - If it hits timeout with actions remaining, terminate with test failed
+    try:
+        # naive line iteration over pipes gets buffered, which isn't what we want,
+        # see https://bugs.python.org/issue3907
+        for line in iter(p.stderr.readline, ""):
+            sys.stdout.write(line)
+
+            for k,v in checks.items():
+                g = re.match(k, line)
+                if g:
+                    logging.info("Matched check %s", k)
+                    code += v(checks, k, g)
+                    if code != 0:
+                        logging.error("Check failed")
+                        if not terminated:
+                            p.terminate()
+                            terminated = True
+
+            if terminated:
+                continue
+
+            if time.time() > timeout:
+                logging.error("Exceeded timeout with actions remaining: %s", actions)
+                code += 1
+                if not terminated:
+                    p.terminate()
+                    terminated = True
+
+            k, v = actions[0]
+            g = re.match(k, line)
+            if g:
+                logging.info("Matched action %s", k)
+                actions.pop(0)
+                code += v(g)
+                if code != 0:
+                    logging.error("Action failed")
+                    p.terminate()
+                    terminated = True
+
+            if not actions:
+                p.terminate()
+                terminated = True
+    except KeyboardInterrupt:
+        p.kill()
+
+    if actions:
+        logging.error("Ended with remaining actions: %s", actions)
+        code = 1
+
+    shutil.rmtree(fake_slurm)
+
+    if code == 0:
+        logging.info("%s passed", name)
+    else:
+        logging.info("%s failed", name)
+
+    return code
+
+
+def main():
+    # Test lifecycle.
+
+    tests = {
+        "test_single_node": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+                r".*Setting node quota.*": fail,
+            },
+            "arvnodeman.test.fake_driver.FakeDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"}),
+        "test_multiple_nodes": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
+                r".*Setting node quota.*": fail,
+            },
+            "arvnodeman.test.fake_driver.FakeDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test_hit_quota": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*setting node quota to 3", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
+                r".*Sending create_node request.*": partial(expect_count, 5)
+            },
+            "arvnodeman.test.fake_driver.QuotaDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test_probe_quota": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*setting node quota to 3", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*sending request", jobs_req),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
+                r".*Sending create_node request.*": partial(expect_count, 9)
+            },
+            "arvnodeman.test.fake_driver.QuotaDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         })
+    }
+
+    code = 0
+    if len(sys.argv) > 1:
+        code = run_test(sys.argv[1], *tests[sys.argv[1]])
+    else:
+        for t in sorted(tests.keys()):
+            code += run_test(t, *tests[t])
+
+    if code == 0:
+        logging.info("Tests passed")
+    else:
+        logging.info("Tests failed")
+
+    exit(code)
+
+if __name__ == '__main__':
+    main()
index c718dad6b6440919361a1bc6179cbb6610db7c88..b950cc1169c23b6990ae8cc4f0dd0aafe69914ba 100644 (file)
@@ -90,27 +90,27 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
 
-    def test_unknown_basehttperror_not_retried(self):
+    def test_basehttperror_retried(self):
         self.make_mocks()
         self.cloud_client.create_node.side_effect = [
-            BaseHTTPError(400, "Unknown"),
+            BaseHTTPError(500, "Try again"),
             self.cloud_client.create_node.return_value,
             ]
         self.make_actor()
-        finished = threading.Event()
-        self.setup_actor.subscribe(lambda _: finished.set())
-        assert(finished.wait(self.TIMEOUT))
-        self.assertEqual(0, self.cloud_client.post_create_node.call_count)
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.assertEqual(1, self.cloud_client.post_create_node.call_count)
 
-    def test_known_basehttperror_retried(self):
+    def test_instance_exceeded_not_retried(self):
         self.make_mocks()
         self.cloud_client.create_node.side_effect = [
             BaseHTTPError(400, "InstanceLimitExceeded"),
             self.cloud_client.create_node.return_value,
             ]
         self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.assertEqual(1, self.cloud_client.post_create_node.call_count)
+        done = self.FUTURE_CLASS()
+        self.setup_actor.subscribe(done.set)
+        done.get(self.TIMEOUT)
+        self.assertEqual(0, self.cloud_client.post_create_node.call_count)
 
     def test_failed_post_create_retried(self):
         self.make_mocks()
index 04ff9b6d79962922ea8a3327edc726db528b524e..c8af72a8d0bf707bdddb68e14934b5174346dd03 100644 (file)
@@ -305,7 +305,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_arvados_nodes([arv_node])
         self.daemon.update_cloud_nodes([cloud_node])
@@ -322,13 +322,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         setup = self.start_node_boot(cloud_node)
         self.daemon.update_cloud_nodes([cloud_node])
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_no_duplication_when_booted_node_listed(self):
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node, id_num=2)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
@@ -337,14 +337,14 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # even it doesn't appear in the listing (e.g., because of delays
         # propagating tags).
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
 
     def test_booted_unlisted_node_counted(self):
         setup = self.start_node_boot(id_num=1)
-        self.daemon.node_up(setup)
+        self.daemon.node_setup_finished(setup)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -352,7 +352,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_booted_node_can_shutdown(self):
         setup = self.start_node_boot()
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
@@ -370,7 +370,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
@@ -393,7 +393,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
         self.cloud_factory().node_start_time.return_value = time.time() - 3601
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.assertFalse(self.node_shutdown.start.called)
         now = time.time()
@@ -406,7 +406,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(2)
         setup = self.start_node_boot(cloud_node)
         self.cloud_factory().node_start_time.return_value = time.time() - 3601
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.monitor_list()[0].tell_proxy().consider_shutdown()
@@ -419,7 +419,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
@@ -431,7 +431,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(3)
         arv_node = testutil.arvados_node_mock(3)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -443,7 +443,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(5)
         arv_node = testutil.arvados_node_mock(5, job_uuid=True)
         setup = self.start_node_boot(cloud_node, arv_node)
-        self.daemon.node_up(setup).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
         self.assertEqual(1, self.alive_monitor_count())
         self.daemon.update_cloud_nodes([cloud_node])
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
@@ -581,7 +581,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         new_node.stop_if_no_cloud_node.reset_mock()
         self.daemon.shutdown().get(self.TIMEOUT)
         self.assertTrue(new_node.stop_if_no_cloud_node.called)
-        self.daemon.node_up(new_node).get(self.TIMEOUT)
+        self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
         self.assertTrue(new_node.stop.called)
         self.timer.deliver()
         self.assertTrue(
index 554ff1eb79724d48dd8e584f490420bdcac91579..9f7730be15ba2fff906c7b88b840361143a2bfbf 100644 (file)
@@ -142,7 +142,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_subscribers_get_server_lists(self, mock_squeue):
         mock_squeue.return_value = ""
 
-        self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+        self.build_monitor([{'items': [1, 2]}], self.MockCalculator(), True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with([testutil.MockSize(1),
@@ -155,7 +155,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
-            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
+            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+                                                                True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with([testutil.MockSize(1),
@@ -168,7 +169,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
-            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
+            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+                                                                True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
         self.subscriber.assert_called_with([testutil.MockSize(1),