X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/abe8fc4c5818165ffb696448c81b3bf1bb5dc1dd..080c940d7a8134a6e277a53b7e45eb27e2b2c87f:/services/nodemanager/arvnodeman/config.py diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py index 24fd828cf5..4fda7e76d6 100644 --- a/services/nodemanager/arvnodeman/config.py +++ b/services/nodemanager/arvnodeman/config.py @@ -1,26 +1,34 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function import ConfigParser import importlib import logging -import ssl +import sys import arvados import httplib2 -import libcloud.common.types as cloud_types import pykka from apiclient import errors as apierror -# IOError is the base class for socket.error and friends. +from .baseactor import BaseNodeManagerActor + +from functools import partial +from libcloud.common.types import LibcloudError +from libcloud.common.exceptions import BaseHTTPError + +# IOError is the base class for socket.error, ssl.SSLError, and friends. # It seems like it hits the sweet spot for operations we want to retry: # it's low-level, but unlikely to catch code bugs. -NETWORK_ERRORS = (IOError, ssl.SSLError) +NETWORK_ERRORS = (IOError,) ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,) -CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,) +CLOUD_ERRORS = NETWORK_ERRORS + (LibcloudError, BaseHTTPError) -actor_class = pykka.ThreadingActor +actor_class = BaseNodeManagerActor class NodeManagerConfig(ConfigParser.SafeConfigParser): """Node Manager Configuration class. @@ -36,15 +44,26 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs) for sec_name, settings in { 'Arvados': {'insecure': 'no', - 'timeout': '15'}, + 'timeout': '15', + 'jobs_queue': 'yes', + 'slurm_queue': 'yes' + }, 'Daemon': {'min_nodes': '0', 'max_nodes': '1', 'poll_time': '60', 'max_poll_time': '300', 'poll_stale_after': '600', - 'node_stale_after': str(60 * 60 * 2)}, + 'max_total_price': '0', + 'boot_fail_after': str(sys.maxint), + 'node_stale_after': str(60 * 60 * 2), + 'watchdog': '600', + 'node_mem_scaling': '0.95', + 'consecutive_idle_count': '2'}, + 'Manage': {'address': '127.0.0.1', + 'port': '-1', + 'ManagementToken': ''}, 'Logging': {'file': '/dev/stderr', - 'level': 'WARNING'}, + 'level': 'WARNING'} }.iteritems(): if not self.has_section(sec_name): self.add_section(sec_name) @@ -52,12 +71,23 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): if not self.has_option(sec_name, opt_name): self.set(sec_name, opt_name, value) - def get_section(self, section, transformer=None): + def get_section(self, section, transformers={}, default_transformer=None): + transformer_map = { + str: self.get, + int: self.getint, + bool: self.getboolean, + float: self.getfloat, + } result = self._dict() for key, value in self.items(section): + transformer = None + if transformers.get(key) in transformer_map: + transformer = partial(transformer_map[transformers[key]], section) + elif default_transformer in transformer_map: + transformer = partial(transformer_map[default_transformer], section) if transformer is not None: try: - value = transformer(value) + value = transformer(key) except (TypeError, ValueError): pass result[key] = value @@ -68,6 +98,17 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): for key in self.options('Logging') if key not in self.LOGGING_NONLEVELS} + def dispatch_classes(self): + mod_name = 'arvnodeman.computenode.dispatch' + if self.has_option('Daemon', 'dispatcher'): + mod_name = '{}.{}'.format(mod_name, + self.get('Daemon', 'dispatcher')) + module = importlib.import_module(mod_name) + return (module.ComputeNodeSetupActor, + module.ComputeNodeShutdownActor, + module.ComputeNodeUpdateActor, + module.ComputeNodeMonitorActor) + def new_arvados_client(self): if self.has_option('Daemon', 'certs_file'): certs_file = self.get('Daemon', 'certs_file') @@ -77,8 +118,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): http = httplib2.Http(timeout=self.getint('Arvados', 'timeout'), ca_certs=certs_file, disable_ssl_certificate_validation=insecure) - return arvados.api('v1', - cache=False, # Don't reuse an existing client. + return arvados.api(version='v1', host=self.get('Arvados', 'host'), token=self.get('Arvados', 'token'), insecure=insecure, @@ -87,23 +127,55 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser): def new_cloud_client(self): module = importlib.import_module('arvnodeman.computenode.driver.' + self.get('Cloud', 'provider')) + driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER + if self.has_option('Cloud', 'driver_class'): + d = self.get('Cloud', 'driver_class').split('.') + mod = '.'.join(d[:-1]) + cls = d[-1] + driver_class = importlib.import_module(mod).__dict__[cls] auth_kwargs = self.get_section('Cloud Credentials') if 'timeout' in auth_kwargs: auth_kwargs['timeout'] = int(auth_kwargs['timeout']) return module.ComputeNodeDriver(auth_kwargs, self.get_section('Cloud List'), - self.get_section('Cloud Create')) + self.get_section('Cloud Create'), + driver_class=driver_class) + + def node_sizes(self): + """Finds all acceptable NodeSizes for our installation. - def node_sizes(self, all_sizes): + Returns a list of (NodeSize, kwargs) pairs for each NodeSize object + returned by libcloud that matches a size listed in our config file. + """ + all_sizes = self.new_cloud_client().list_sizes() size_kwargs = {} + section_types = { + 'instance_type': str, + 'price': float, + 'preemptible': bool, + } for sec_name in self.sections(): sec_words = sec_name.split(None, 2) if sec_words[0] != 'Size': continue - size_kwargs[sec_words[1]] = self.get_section(sec_name, int) - return [(size, size_kwargs[size.id]) for size in all_sizes - if size.id in size_kwargs] + size_spec = self.get_section(sec_name, section_types, int) + if 'preemptible' not in size_spec: + size_spec['preemptible'] = False + if 'instance_type' not in size_spec: + # Assume instance type is Size name if missing + size_spec['instance_type'] = sec_words[1] + size_spec['id'] = sec_words[1] + size_kwargs[sec_words[1]] = size_spec + # EC2 node sizes are identified by id. GCE sizes are identified by name. + matching_sizes = [] + for size in all_sizes: + matching_sizes += [ + (size, size_kwargs[s]) for s in size_kwargs + if size_kwargs[s]['instance_type'] == size.id + or size_kwargs[s]['instance_type'] == size.name + ] + return matching_sizes def shutdown_windows(self): - return [int(n) + return [float(n) for n in self.get('Cloud', 'shutdown_windows').split(',')]