services/keep-balance
services/login-sync
services/nodemanager
+services/nodemanager-integration
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
}
do_test services/login-sync login-sync
+test_nodemanager-integration() {
+ cd "$WORKSPACE/services/nodemanager" \
+ && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+}
+do_test services/nodemanager-integration nodemanager-integration
+
for p in "${pythonstuff[@]}"
do
dir=${p%:py3}
import functools
import logging
import time
+import re
import libcloud.common.types as cloud_types
import pykka
from ... import config
from .transitions import transitions
+QuotaExceeded = "QuotaExceeded"
+
class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
"""Base class for actors that change a compute node's state.
self.cloud_size = cloud_size
self.arvados_node = None
self.cloud_node = None
+ self.error = None
if arvados_node is None:
self._later.create_arvados_node()
else:
def create_cloud_node(self):
self._logger.info("Sending create_node request for node size %s.",
self.cloud_size.name)
- self.cloud_node = self._cloud.create_node(self.cloud_size,
- self.arvados_node)
+ try:
+ self.cloud_node = self._cloud.create_node(self.cloud_size,
+ self.arvados_node)
+ except Exception as e:
+ # The set of possible error codes / messages isn't documented for
+ # all clouds, so use a keyword heuristic to determine if the
+ # failure is likely due to a quota.
+ if re.search(r'(exceed|quota|limit)', e.message, re.I):
+ self.error = QuotaExceeded
+ self._logger.warning("Quota exceeded: %s", e)
+ self._finished()
+ return
+ else:
+ raise
# The information included in the node size object we get from libcloud
- # is inconsistent between cloud providers. Replace libcloud NodeSize
+ # is inconsistent between cloud drivers. Replace libcloud NodeSize
# object with compatible CloudSizeWrapper object which merges the size
# info reported from the cloud with size information from the
# configuration file.
Subclasses must implement arvados_create_kwargs, sync_node,
node_fqdn, and node_start_time.
"""
- CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
+ CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError, BaseHTTPError)
@RetryMixin._retry()
def _create_driver(self, driver_class, **auth_kwargs):
# libcloud compute drivers typically raise bare Exceptions to
# represent API errors. Return True for any exception that is
# exactly an Exception, or a better-known higher-level exception.
- if (type(exception) is BaseHTTPError and
- exception.message and
- (exception.message.startswith("InvalidInstanceID.NotFound") or
- exception.message.startswith("InstanceLimitExceeded"))):
- return True
return (isinstance(exception, cls.CLOUD_ERRORS) or
type(exception) is Exception)
ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
for sec_name, settings in {
'Arvados': {'insecure': 'no',
- 'timeout': '15'},
+ 'timeout': '15',
+ 'jobs_queue': 'yes',
+ 'slurm_queue': 'yes'
+ },
'Daemon': {'min_nodes': '0',
'max_nodes': '1',
'poll_time': '60',
'Manage': {'address': '127.0.0.1',
'port': '-1'},
'Logging': {'file': '/dev/stderr',
- 'level': 'WARNING'},
+ 'level': 'WARNING'}
}.iteritems():
if not self.has_section(sec_name):
self.add_section(sec_name)
def new_cloud_client(self):
module = importlib.import_module('arvnodeman.computenode.driver.' +
self.get('Cloud', 'provider'))
+ driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER
+ if self.has_option('Cloud', 'driver_class'):
+ d = self.get('Cloud', 'driver_class').split('.')
+ mod = '.'.join(d[:-1])
+ cls = d[-1]
+ driver_class = importlib.import_module(mod).__dict__[cls]
auth_kwargs = self.get_section('Cloud Credentials')
if 'timeout' in auth_kwargs:
auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
return module.ComputeNodeDriver(auth_kwargs,
self.get_section('Cloud List'),
- self.get_section('Cloud Create'))
+ self.get_section('Cloud Create'),
+ driver_class=driver_class)
def node_sizes(self, all_sizes):
"""Finds all acceptable NodeSizes for our installation.
self.min_cloud_size = self.server_calculator.cheapest_size()
self.min_nodes = min_nodes
self.max_nodes = max_nodes
+ self.node_quota = max_nodes
self.max_total_price = max_total_price
self.poll_stale_after = poll_stale_after
self.boot_fail_after = boot_fail_after
self.last_polls[poll_key] = time.time()
def _pair_nodes(self, node_record, arvados_node):
- self._logger.info("Cloud node %s is now paired with Arvados node %s",
- node_record.cloud_node.name, arvados_node['uuid'])
+ self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
+ node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
self._arvados_nodes_actor.subscribe_to(
arvados_node['uuid'], node_record.actor.update_arvados_node)
node_record.arvados_node = arvados_node
def _nodes_wanted(self, size):
total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
under_min = self.min_nodes - total_node_count
- over_max = total_node_count - self.max_nodes
+ over_max = total_node_count - self.node_quota
total_price = self._total_price()
counts = self._state_counts(size)
up_count = self._nodes_up(counts)
busy_count = counts["busy"]
+ wishlist_count = self._size_wishlist(size)
self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
- self._size_wishlist(size),
+ wishlist_count,
up_count,
counts["booting"],
counts["unpaired"],
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- wanted = self._size_wishlist(size) - (up_count - busy_count)
+ wanted = wishlist_count - (up_count - busy_count)
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
can_boot = int((self.max_total_price - total_price) / size.price)
if can_boot == 0:
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
- new_setup.subscribe(self._later.node_up)
+ new_setup.subscribe(self._later.node_setup_finished)
if nodes_wanted > 1:
self._later.start_node(cloud_size)
def _get_actor_attrs(self, actor, *attr_names):
return pykka.get_all([getattr(actor, name) for name in attr_names])
- def node_up(self, setup_proxy):
+ def node_setup_finished(self, setup_proxy):
# Called when a SetupActor has completed.
- cloud_node, arvados_node = self._get_actor_attrs(
- setup_proxy, 'cloud_node', 'arvados_node')
+ cloud_node, arvados_node, error = self._get_actor_attrs(
+ setup_proxy, 'cloud_node', 'arvados_node', 'error')
setup_proxy.stop()
- # If cloud_node is None then the node create wasn't
- # successful and so there isn't anything to do.
- if cloud_node is not None:
+ if cloud_node is None:
+ # If cloud_node is None then the node create wasn't successful.
+ if error == dispatch.QuotaExceeded:
+ # We've hit a quota limit, so adjust node_quota to stop trying to
+ # boot new nodes until the node count goes down.
+ self.node_quota = len(self.cloud_nodes)
+ self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
+ else:
# Node creation succeeded. Update cloud node list.
cloud_node._nodemanager_recently_booted = True
self._register_cloud_node(cloud_node)
+
+ # Different quota policies may in force depending on the cloud
+ # provider, account limits, and the specific mix of nodes sizes
+ # that are already created. If we are right at the quota limit,
+ # we want to probe to see if the last quota still applies or if we
+ # are allowed to create more nodes.
+ #
+ # For example, if the quota is actually based on core count, the
+ # quota might be 20 single-core machines or 10 dual-core machines.
+ # If we previously set node_quota to 10 dual core machines, but are
+ # now booting single core machines (actual quota 20), we want to
+ # allow the quota to expand so we don't get stuck at 10 machines
+ # forever.
+ if len(self.cloud_nodes) >= self.node_quota:
+ self.node_quota = len(self.cloud_nodes)+1
+ self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
+
+ self.node_quota = min(self.node_quota, self.max_nodes)
del self.booting[setup_proxy.actor_ref.actor_urn]
del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
CLIENT_ERRORS = ARVADOS_ERRORS
- def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+ def __init__(self, client, timer_actor, server_calc,
+ jobs_queue, slurm_queue, *args, **kwargs):
super(JobQueueMonitorActor, self).__init__(
client, timer_actor, *args, **kwargs)
+ self.jobs_queue = jobs_queue
+ self.slurm_queue = slurm_queue
self._calculator = server_calc
@staticmethod
return int(x)
def _send_request(self):
- # cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
queuelist = []
- for out in squeue_out.splitlines():
- cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
- queuelist.append({
- "uuid": jobname,
- "runtime_constraints": {
- "min_cores_per_node": cpu,
- "min_ram_mb_per_node": self.coerce_to_mb(ram),
- "min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
- })
-
- queuelist.extend(self._client.jobs().queue().execute()['items'])
+ if self.slurm_queue:
+ # cpus, memory, tempory disk space, reason, job name
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+ for out in squeue_out.splitlines():
+ try:
+ cpu, ram, disk, reason, jobname = out.split("|", 4)
+ if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "min_cores_per_node": cpu,
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+ }
+ })
+ except ValueError:
+ pass
+
+ if self.jobs_queue:
+ queuelist.extend(self._client.jobs().queue().execute()['items'])
return queuelist
from ._version import __version__
node_daemon = None
+watchdog = None
def abort(msg, code=1):
print("arvados-node-manager: " + msg)
config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
job_queue_poller = JobQueueMonitorActor.start(
config.new_arvados_client(), timer, server_calculator,
- poll_time, max_poll_time).tell_proxy()
+ config.getboolean('Arvados', 'jobs_queue'),
+ config.getboolean('Arvados', 'slurm_queue'),
+ poll_time, max_poll_time
+ ).tell_proxy()
return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
_caught_signals = {}
pykka.ActorRegistry.stop_all()
sys.exit(-signal_code)
elif current_count == 0:
+ watchdog.stop()
node_daemon.shutdown()
elif current_count == 1:
pykka.ActorRegistry.stop_all()
sys.exit(-signal_code)
def main(args=None):
- global node_daemon
+ global node_daemon, watchdog
args = parse_cli(args)
config = load_config(args.config)
node_setup, node_shutdown, node_monitor,
max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
- WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+ watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
cloud_node_poller.actor_ref,
arvados_node_poller.actor_ref,
job_queue_poller.actor_ref,
sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
nodestates = {}
for out in sinfo_out.splitlines():
- nodename, state = out.split(" ", 2)
- if state in ('alloc', 'alloc*',
- 'comp', 'comp*',
- 'mix', 'mix*',
- 'drng', 'drng*'):
- nodestates[nodename] = 'busy'
- elif state == 'idle':
- nodestates[nodename] = 'idle'
- else:
- nodestates[nodename] = 'down'
+ try:
+ nodename, state = out.split(" ", 2)
+ if state in ('alloc', 'alloc*',
+ 'comp', 'comp*',
+ 'mix', 'mix*',
+ 'drng', 'drng*'):
+ nodestates[nodename] = 'busy'
+ elif state == 'idle':
+ nodestates[nodename] = 'idle'
+ else:
+ nodestates[nodename] = 'down'
+ except ValueError:
+ pass
for n in nodelist:
if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
--- /dev/null
+import re
+import urllib
+import ssl
+
+from libcloud.compute.base import NodeSize, Node, NodeDriver, NodeState
+from libcloud.common.exceptions import BaseHTTPError
+
+all_nodes = []
+create_calls = 0
+quota = 2
+
+class FakeDriver(NodeDriver):
+ def __init__(self, *args, **kwargs):
+ self.name = "FakeDriver"
+
+ def list_sizes(self, **kwargs):
+ return [NodeSize("Standard_D3", "Standard_D3", 3500, 200, 0, 0, self),
+ NodeSize("Standard_D4", "Standard_D4", 7000, 400, 0, 0, self)]
+
+ def list_nodes(self, **kwargs):
+ return all_nodes
+
+ def create_node(self, name=None,
+ size=None,
+ image=None,
+ auth=None,
+ ex_storage_account=None,
+ ex_customdata=None,
+ ex_resource_group=None,
+ ex_user_name=None,
+ ex_tags=None,
+ ex_network=None):
+ global all_nodes, create_calls
+ create_calls += 1
+ n = Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
+ all_nodes.append(n)
+ ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0] + "&instance_id=" + name
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_NONE
+ f = urllib.urlopen(ping_url, "", context=ctx)
+ f.close()
+ return n
+
+ def destroy_node(self, cloud_node):
+ global all_nodes
+ all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+ return True
+
+ def get_image(self, img):
+ pass
+
+ def ex_create_tags(self, cloud_node, tags):
+ pass
+
+class QuotaDriver(FakeDriver):
+ def create_node(self, name=None,
+ size=None,
+ image=None,
+ auth=None,
+ ex_storage_account=None,
+ ex_customdata=None,
+ ex_resource_group=None,
+ ex_user_name=None,
+ ex_tags=None,
+ ex_network=None):
+ global all_nodes, create_calls, quota
+ if len(all_nodes) >= quota:
+ raise BaseHTTPError(503, "Quota exceeded")
+ else:
+ return super(QuotaDriver, self).create_node(name=name,
+ size=size,
+ image=image,
+ auth=auth,
+ ex_storage_account=ex_storage_account,
+ ex_customdata=ex_customdata,
+ ex_resource_group=ex_resource_group,
+ ex_user_name=ex_user_name,
+ ex_tags=ex_tags,
+ ex_network=ex_network)
+
+ def destroy_node(self, cloud_node):
+ global all_nodes, quota
+ all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+ if len(all_nodes) == 0:
+ quota = 4
+ return True
host = zyxwv.arvadosapi.com
token = ARVADOS_TOKEN
timeout = 15
+jobs_queue = yes # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes # Get work request from squeue (containers API)
# Accept an untrusted SSL certificate from the API server?
insecure = no
host = zyxwv.arvadosapi.com
token = ARVADOS_TOKEN
timeout = 15
+jobs_queue = yes # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes # Get work request from squeue (containers API)
# Accept an untrusted SSL certificate from the API server?
insecure = no
host = zyxwv.arvadosapi.com
token = ARVADOS_TOKEN
timeout = 15
+jobs_queue = yes # Get work request from Arvados jobs queue (jobs API)
+slurm_queue = yes # Get work request from squeue (containers API)
# Accept an untrusted SSL certificate from the API server?
insecure = no
--- /dev/null
+# Azure configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
+[Daemon]
+# The dispatcher can customize the start and stop procedure for
+# cloud nodes. For example, the SLURM dispatcher drains nodes
+# through SLURM before shutting them down.
+#dispatcher = slurm
+
+# Node Manager will ensure that there are at least this many nodes running at
+# all times. If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type. However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
+min_nodes = 0
+
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
+# Poll Azure nodes and Arvados for new information every N seconds.
+poll_time = 5
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down. Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 45
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node has been running for at least this long, but it
+# isn't paired with an Arvados node, do not shut it down, but leave it alone.
+# This prevents the node manager from shutting down a node that might
+# actually be doing work, but is having temporary trouble contacting the
+# API server.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 14400
+
+# Scaling factor to be applied to nodes' available RAM size. Usually there's a
+# variable discrepancy between the advertised RAM value on cloud nodes and the
+# actual amount available.
+# If not set, this value will be set to 0.95
+node_mem_scaling = 0.95
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+#file = node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = DEBUG
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = {host}
+token = {token}
+timeout = 15
+jobs_queue = no
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = yes
+
+[Cloud]
+provider = azure
+driver_class = {driver_class}
+
+# Shutdown windows define periods of time when a node may and may not be shut
+# down. These are windows in full minutes, separated by commas. Counting from
+# the time the node is booted, the node WILL NOT shut down for N1 minutes; then
+# it MAY shut down for N2 minutes; then it WILL NOT shut down for N3 minutes;
+# and so on. For example, "20, 999999" means the node may shut down between
+# the 20th and 999999th minutes of uptime.
+# Azure bills by the minute, so it makes sense to agressively shut down idle
+# nodes. Specify at least two windows. You can add as many as you need beyond
+# that.
+shutdown_windows = 1, 999999
+
+[Cloud Credentials]
+# Use "azure account list" with the azure CLI to get these values.
+tenant_id = 00000000-0000-0000-0000-000000000000
+subscription_id = 00000000-0000-0000-0000-000000000000
+
+# The following directions are based on
+# https://azure.microsoft.com/en-us/documentation/articles/resource-group-authenticate-service-principal/
+#
+# azure config mode arm
+# azure ad app create --name "<Your Application Display Name>" --home-page "<https://YourApplicationHomePage>" --identifier-uris "<https://YouApplicationUri>" --password <Your_Password>
+# azure ad sp create "<Application_Id>"
+# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/<subscriptionId>/
+#
+# Use <Application_Id> for "key" and the <Your_Password> for "secret"
+#
+key = 00000000-0000-0000-0000-000000000000
+secret = PASSWORD
+timeout = 60
+region = East US
+
+[Cloud List]
+# The resource group in which the compute node virtual machines will be created
+# and listed.
+ex_resource_group = ArvadosResourceGroup
+
+[Cloud Create]
+# The image id, in the form "Publisher:Offer:SKU:Version"
+image = Canonical:UbuntuServer:14.04.3-LTS:14.04.201508050
+
+# Path to a local ssh key file that will be used to provision new nodes.
+ssh_key = {ssh_key}
+
+# The account name for the admin user that will be provisioned on new nodes.
+ex_user_name = arvadosuser
+
+# The Azure storage account that will be used to store the node OS disk images.
+ex_storage_account = arvadosstorage
+
+# The virtual network the VMs will be associated with.
+ex_network = ArvadosNetwork
+
+# Optional subnet of the virtual network.
+#ex_subnet = default
+
+# Node tags
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+# the API server to ping
+ping_host = {host}
+
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use. The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue. You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
+# Each size section MUST define the number of cores are available in this
+# size class (since libcloud does not provide any consistent API for exposing
+# this setting).
+# You may also want to define the amount of scratch space (expressed
+# in GB) for Crunch jobs. You can also override Microsoft's provided
+# data fields by setting them here.
+
+[Size Standard_D3]
+cores = 4
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
--- /dev/null
+#!/usr/bin/env python
+"""Integration test framework for node manager.
+
+Runs full node manager with an API server (needs ARVADOS_API_HOST and
+ARVADOS_API_TOKEN). Stubs out the cloud driver and slurm commands to mock
+specific behaviors. Monitors the log output to verify an expected sequence of
+events or behaviors for each test.
+
+"""
+
+import subprocess
+import os
+import sys
+import re
+import time
+import logging
+import stat
+import tempfile
+import shutil
+from functools import partial
+import arvados
+
+logging.basicConfig(level=logging.INFO)
+
+fake_slurm = None
+compute_nodes = None
+all_jobs = None
+
+def update_script(path, val):
+ with open(path+"_", "w") as f:
+ f.write(val)
+ os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+ os.rename(path+"_", path)
+ logging.info("Update script %s: %s", path, val)
+
+def set_squeue(g):
+ global all_jobs
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ return 0
+
+
+def node_paired(g):
+ global compute_nodes
+ compute_nodes[g.group(1)] = g.group(3)
+
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+
+ for k,v in all_jobs.items():
+ if v == "ReqNodeNotAvail":
+ all_jobs[k] = "Running"
+ break
+
+ set_squeue(g)
+
+ return 0
+
+def remaining_jobs(g):
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+
+ for k,v in all_jobs.items():
+ all_jobs[k] = "Running"
+
+ set_squeue(g)
+
+ return 0
+
+
+def node_busy(g):
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+ "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+ return 0
+
+def node_shutdown(g):
+ global compute_nodes
+ del compute_nodes[g.group(1)]
+ return 0
+
+def jobs_req(g):
+ global all_jobs
+ for k,v in all_jobs.items():
+ all_jobs[k] = "ReqNodeNotAvail"
+ set_squeue(g)
+ return 0
+
+def noop(g):
+ return 0
+
+def fail(checks, pattern, g):
+ return 1
+
+def expect_count(count, checks, pattern, g):
+ if count == 0:
+ return 1
+ else:
+ checks[pattern] = partial(expect_count, count-1)
+ return 0
+
+def run_test(name, actions, checks, driver_class, jobs):
+ code = 0
+
+ # Delete any stale node records
+ api = arvados.api('v1')
+ for n in api.nodes().list().execute()['items']:
+ api.nodes().delete(uuid=n["uuid"]).execute()
+
+ logging.info("Start %s", name)
+
+ global fake_slurm
+ fake_slurm = tempfile.mkdtemp()
+ logging.info("fake_slurm is %s", fake_slurm)
+
+ global compute_nodes
+ compute_nodes = {}
+
+ global all_jobs
+ all_jobs = jobs
+
+ env = os.environ.copy()
+ env["PATH"] = fake_slurm + ":" + env["PATH"]
+
+ # Reset fake squeue/sinfo to empty
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n")
+ update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n")
+
+ # Write configuration file for test
+ with open("tests/fake.cfg.template") as f:
+ with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
+ pass
+ with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
+ cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
+ token=os.environ["ARVADOS_API_TOKEN"],
+ driver_class=driver_class,
+ ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
+
+ # Tests must complete in less than 3 minutes.
+ timeout = time.time() + 180
+ terminated = False
+
+ # Now start node manager
+ p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
+ bufsize=0, stderr=subprocess.PIPE, env=env)
+
+ # Test main loop:
+ # - Read line
+ # - Apply negative checks (thinks that are not supposed to happen)
+ # - Check timeout
+ # - Check if the next action should trigger
+ # - If all actions are exhausted, terminate with test success
+ # - If it hits timeout with actions remaining, terminate with test failed
+ try:
+ # naive line iteration over pipes gets buffered, which isn't what we want,
+ # see https://bugs.python.org/issue3907
+ for line in iter(p.stderr.readline, ""):
+ sys.stdout.write(line)
+
+ for k,v in checks.items():
+ g = re.match(k, line)
+ if g:
+ logging.info("Matched check %s", k)
+ code += v(checks, k, g)
+ if code != 0:
+ logging.error("Check failed")
+ if not terminated:
+ p.terminate()
+ terminated = True
+
+ if terminated:
+ continue
+
+ if time.time() > timeout:
+ logging.error("Exceeded timeout with actions remaining: %s", actions)
+ code += 1
+ if not terminated:
+ p.terminate()
+ terminated = True
+
+ k, v = actions[0]
+ g = re.match(k, line)
+ if g:
+ logging.info("Matched action %s", k)
+ actions.pop(0)
+ code += v(g)
+ if code != 0:
+ logging.error("Action failed")
+ p.terminate()
+ terminated = True
+
+ if not actions:
+ p.terminate()
+ terminated = True
+ except KeyboardInterrupt:
+ p.kill()
+
+ if actions:
+ logging.error("Ended with remaining actions: %s", actions)
+ code = 1
+
+ shutil.rmtree(fake_slurm)
+
+ if code == 0:
+ logging.info("%s passed", name)
+ else:
+ logging.info("%s failed", name)
+
+ return code
+
+
+def main():
+ # Test lifecycle.
+
+ tests = {
+ "test_single_node": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"}),
+ "test_multiple_nodes": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
+ r".*Setting node quota.*": fail,
+ },
+ "arvnodeman.test.fake_driver.FakeDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }),
+ "test_hit_quota": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*setting node quota to 3", noop),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
+ r".*Sending create_node request.*": partial(expect_count, 5)
+ },
+ "arvnodeman.test.fake_driver.QuotaDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }),
+ "test_probe_quota": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*setting node quota to 3", noop),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*sending request", jobs_req),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+ ], {
+ r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
+ r".*Sending create_node request.*": partial(expect_count, 9)
+ },
+ "arvnodeman.test.fake_driver.QuotaDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ })
+ }
+
+ code = 0
+ if len(sys.argv) > 1:
+ code = run_test(sys.argv[1], *tests[sys.argv[1]])
+ else:
+ for t in sorted(tests.keys()):
+ code += run_test(t, *tests[t])
+
+ if code == 0:
+ logging.info("Tests passed")
+ else:
+ logging.info("Tests failed")
+
+ exit(code)
+
+if __name__ == '__main__':
+ main()
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
- def test_unknown_basehttperror_not_retried(self):
+ def test_basehttperror_retried(self):
self.make_mocks()
self.cloud_client.create_node.side_effect = [
- BaseHTTPError(400, "Unknown"),
+ BaseHTTPError(500, "Try again"),
self.cloud_client.create_node.return_value,
]
self.make_actor()
- finished = threading.Event()
- self.setup_actor.subscribe(lambda _: finished.set())
- assert(finished.wait(self.TIMEOUT))
- self.assertEqual(0, self.cloud_client.post_create_node.call_count)
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.assertEqual(1, self.cloud_client.post_create_node.call_count)
- def test_known_basehttperror_retried(self):
+ def test_instance_exceeded_not_retried(self):
self.make_mocks()
self.cloud_client.create_node.side_effect = [
BaseHTTPError(400, "InstanceLimitExceeded"),
self.cloud_client.create_node.return_value,
]
self.make_actor()
- self.wait_for_assignment(self.setup_actor, 'cloud_node')
- self.assertEqual(1, self.cloud_client.post_create_node.call_count)
+ done = self.FUTURE_CLASS()
+ self.setup_actor.subscribe(done.set)
+ done.get(self.TIMEOUT)
+ self.assertEqual(0, self.cloud_client.post_create_node.call_count)
def test_failed_post_create_retried(self):
self.make_mocks()
cloud_node = testutil.cloud_node_mock(4)
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_arvados_nodes([arv_node])
self.daemon.update_cloud_nodes([cloud_node])
cloud_node = testutil.cloud_node_mock(1)
setup = self.start_node_boot(cloud_node)
self.daemon.update_cloud_nodes([cloud_node])
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
def test_no_duplication_when_booted_node_listed(self):
cloud_node = testutil.cloud_node_mock(2)
setup = self.start_node_boot(cloud_node, id_num=2)
- self.daemon.node_up(setup)
+ self.daemon.node_setup_finished(setup)
self.daemon.update_cloud_nodes([cloud_node]).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
# even it doesn't appear in the listing (e.g., because of delays
# propagating tags).
setup = self.start_node_boot()
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
def test_booted_unlisted_node_counted(self):
setup = self.start_node_boot(id_num=1)
- self.daemon.node_up(setup)
+ self.daemon.node_setup_finished(setup)
self.daemon.update_server_wishlist(
[testutil.MockSize(1)]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
def test_booted_node_can_shutdown(self):
setup = self.start_node_boot()
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
def test_booted_node_lifecycle(self):
cloud_node = testutil.cloud_node_mock(6)
setup = self.start_node_boot(cloud_node, id_num=6)
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
def test_booted_node_shut_down_when_never_listed(self):
setup = self.start_node_boot()
self.cloud_factory().node_start_time.return_value = time.time() - 3601
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.assertFalse(self.node_shutdown.start.called)
now = time.time()
cloud_node = testutil.cloud_node_mock(2)
setup = self.start_node_boot(cloud_node)
self.cloud_factory().node_start_time.return_value = time.time() - 3601
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([cloud_node])
self.monitor_list()[0].tell_proxy().consider_shutdown()
arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
setup = self.start_node_boot(cloud_node, arv_node)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
cloud_node = testutil.cloud_node_mock(3)
arv_node = testutil.arvados_node_mock(3)
setup = self.start_node_boot(cloud_node, arv_node)
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([cloud_node])
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
cloud_node = testutil.cloud_node_mock(5)
arv_node = testutil.arvados_node_mock(5, job_uuid=True)
setup = self.start_node_boot(cloud_node, arv_node)
- self.daemon.node_up(setup).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
self.assertEqual(1, self.alive_monitor_count())
self.daemon.update_cloud_nodes([cloud_node])
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
new_node.stop_if_no_cloud_node.reset_mock()
self.daemon.shutdown().get(self.TIMEOUT)
self.assertTrue(new_node.stop_if_no_cloud_node.called)
- self.daemon.node_up(new_node).get(self.TIMEOUT)
+ self.daemon.node_setup_finished(new_node).get(self.TIMEOUT)
self.assertTrue(new_node.stop.called)
self.timer.deliver()
self.assertTrue(
def test_subscribers_get_server_lists(self, mock_squeue):
mock_squeue.return_value = ""
- self.build_monitor([{'items': [1, 2]}], self.MockCalculator())
+ self.build_monitor([{'items': [1, 2]}], self.MockCalculator(), True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([testutil.MockSize(1),
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
- [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([testutil.MockSize(1),
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
- [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]))
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.subscriber.assert_called_with([testutil.MockSize(1),